</parent>
<artifactId>features-netconf</artifactId>
- <packaging>pom</packaging>
+ <packaging>jar</packaging>
<properties>
<features.file>features.xml</features.file>
<groupId>org.opendaylight.controller</groupId>
<artifactId>netconf-auth</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>netconf-notifications-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>netconf-notifications-impl</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>ietf-netconf</artifactId>
+ </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>ietf-netconf-monitoring</artifactId>
<groupId>org.opendaylight.controller</groupId>
<artifactId>ietf-netconf-monitoring-extension</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>ietf-netconf-notifications</artifactId>
+ </dependency>
<dependency>
<groupId>org.opendaylight.yangtools.model</groupId>
<artifactId>ietf-inet-types</artifactId>
<groupId>org.opendaylight.controller</groupId>
<artifactId>netconf-monitoring</artifactId>
</dependency>
+ <!-- test to validate features.xml -->
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>features-test</artifactId>
+ <version>${yangtools.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <!-- dependency for opendaylight-karaf-empty for use by testing -->
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>opendaylight-karaf-empty</artifactId>
+ <version>${commons.opendaylight.version}</version>
+ <type>zip</type>
+ </dependency>
</dependencies>
<build>
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>${surefire.version}</version>
+ <configuration>
+ <systemPropertyVariables>
+ <karaf.distro.groupId>org.opendaylight.controller</karaf.distro.groupId>
+ <karaf.distro.artifactId>opendaylight-karaf-empty</karaf.distro.artifactId>
+ <karaf.distro.version>${commons.opendaylight.version}</karaf.distro.version>
+ </systemPropertyVariables>
+ <dependenciesToScan>
+ <dependency>org.opendaylight.yangtools:features-test</dependency>
+ </dependenciesToScan>
+ </configuration>
+ </plugin>
</plugins>
</build>
<scm>
</feature>
<feature name='odl-netconf-notifications-impl' version='${project.version}' description="OpenDaylight :: Netconf :: Monitoring :: Impl">
<feature version='${project.version}'>odl-netconf-notifications-api</feature>
+ <feature version='${project.version}'>odl-netconf-util</feature>
+ <feature version='${yangtools.version}'>odl-yangtools-binding-generator</feature>
<bundle>mvn:org.opendaylight.controller/netconf-notifications-impl/${project.version}</bundle>
</feature>
* @param connectStrategyFactory Factory for creating reconnection strategy for every reconnect attempt
*
* @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g.
- * success if it indicates no further attempts should be made and failure if it reports an error
+ * success is never reported, only failure when it runs out of reconnection attempts.
*/
protected Future<Void> createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
final PipelineInitializer<S> initializer) {
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
import org.slf4j.Logger;
channel.pipeline().addLast(new ClosedChannelHandler(ReconnectPromise.this));
}
});
+
+ pending.addListener(new GenericFutureListener<Future<Object>>() {
+ @Override
+ public void operationComplete(Future<Object> future) throws Exception {
+ if (!future.isSuccess()) {
+ ReconnectPromise.this.setFailure(future.cause());
+ }
+ }
+ });
}
/**
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import org.opendaylight.controller.cluster.example.messages.KeyValue;
import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ClientActor extends UntypedActor {
- protected final LoggingAdapter LOG =
- Logging.getLogger(getContext().system(), this);
+ protected final Logger LOG = LoggerFactory.getLogger(getClass());
private final ActorRef target;
try {
bs = fromObject(state);
} catch (Exception e) {
- LOG.error(e, "Exception in creating snapshot");
+ LOG.error("Exception in creating snapshot", e);
}
getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
}
try {
state.putAll((HashMap) toObject(snapshot));
} catch (Exception e) {
- LOG.error(e, "Exception in applying snapshot");
+ LOG.error("Exception in applying snapshot", e);
}
if(LOG.isDebugEnabled()) {
LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import akka.japi.Procedure;
import akka.persistence.RecoveryCompleted;
import akka.persistence.SaveSnapshotFailure;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* RaftActor encapsulates a state machine that needs to be kept synchronized
* </ul>
*/
public abstract class RaftActor extends AbstractUntypedPersistentActor {
- protected final LoggingAdapter LOG =
- Logging.getLogger(getContext().system(), this);
+ protected final Logger LOG = LoggerFactory.getLogger(getClass());
/**
* The current state determines the current behavior of a RaftActor
} else if (message instanceof SaveSnapshotFailure) {
SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
- LOG.error(saveSnapshotFailure.cause(), "{}: SaveSnapshotFailure received for snapshot Cause:",
- persistenceId());
+ LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
+ persistenceId(), saveSnapshotFailure.cause());
context.getReplicatedLog().snapshotRollback();
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
-import akka.event.LoggingAdapter;
-
import java.util.Map;
+import org.slf4j.Logger;
/**
* The RaftActorContext contains that portion of the RaftActors state that
*
* @return
*/
- LoggingAdapter getLogger();
+ Logger getLogger();
/**
* Get a mapping of peerId's to their addresses
package org.opendaylight.controller.cluster.raft;
+import static com.google.common.base.Preconditions.checkState;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActorContext;
-import akka.event.LoggingAdapter;
import java.util.Map;
-
-import static com.google.common.base.Preconditions.checkState;
+import org.slf4j.Logger;
public class RaftActorContextImpl implements RaftActorContext {
private final Map<String, String> peerAddresses;
- private final LoggingAdapter LOG;
+ private final Logger LOG;
private final ConfigParams configParams;
ElectionTerm termInformation, long commitIndex,
long lastApplied, ReplicatedLog replicatedLog,
Map<String, String> peerAddresses, ConfigParams configParams,
- LoggingAdapter logger) {
+ Logger logger) {
this.actor = actor;
this.context = context;
this.id = id;
return context.system();
}
- @Override public LoggingAdapter getLogger() {
+ @Override public Logger getLogger() {
return this.LOG;
}
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
// Upon election: send initial empty AppendEntries RPCs
// (heartbeat) to each server; repeat during idle periods to
// prevent election timeouts (§5.2)
- scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
+ sendAppendEntries(0);
}
/**
context.setCommitIndex(logIndex);
applyLogToStateMachine(logIndex);
} else {
- sendAppendEntries();
+ sendAppendEntries(0);
}
}
- private void sendAppendEntries() {
+ private void sendAppendEntries(long timeSinceLastActivityInterval) {
// Send an AppendEntries to all followers
- long heartbeatInterval = context.getConfigParams().getHeartBeatInterval().toMillis();
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
final String followerId = e.getKey();
final FollowerLogInformation followerLogInformation = e.getValue();
// This checks helps not to send a repeat message to the follower
- if(followerLogInformation.timeSinceLastActivity() >= heartbeatInterval) {
+ if(!followerLogInformation.isFollowerActive() ||
+ followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) {
sendUpdatesToFollower(followerId, followerLogInformation, true);
}
}
followerToSnapshot.getTotalChunks());
}
} catch (IOException e) {
- LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId());
+ LOG.error("{}: InstallSnapshot failed for Leader.", context.getId(), e);
}
}
private void sendHeartBeat() {
if (!followerToLog.isEmpty()) {
- sendAppendEntries();
+ sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis());
}
}
import akka.actor.ActorRef;
import akka.actor.Cancellable;
-import akka.event.LoggingAdapter;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.slf4j.Logger;
import scala.concurrent.duration.FiniteDuration;
/**
/**
*
*/
- protected final LoggingAdapter LOG;
+ protected final Logger LOG;
/**
*
} else {
//if one index is not present in the log, no point in looping
// around as the rest wont be present either
- LOG.warning(
+ LOG.warn(
"{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
context.getId(), i, i, index);
break;
try {
close();
} catch (Exception e) {
- LOG.error(e, "{}: Failed to close behavior : {}", context.getId(), this.state());
+ LOG.error("{}: Failed to close behavior : {}", context.getId(), this.state(), e);
}
return behavior;
snapshotTracker = null;
} catch (Exception e){
- LOG.error(e, "{}: Exception in InstallSnapshot of follower", context.getId());
+ LOG.error("{}: Exception in InstallSnapshot of follower", context.getId(), e);
//send reply with success as false. The chunk will be sent again on failure
sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
installSnapshot.getChunkIndex(), false), actor());
package org.opendaylight.controller.cluster.raft.behaviors;
-import akka.event.LoggingAdapter;
import com.google.common.base.Optional;
import com.google.protobuf.ByteString;
+import org.slf4j.Logger;
/**
* SnapshotTracker does house keeping for a snapshot that is being installed in chunks on the Follower
*/
public class SnapshotTracker {
- private final LoggingAdapter LOG;
+ private final Logger LOG;
private final int totalChunks;
private ByteString collectedChunks = ByteString.EMPTY;
private int lastChunkIndex = AbstractLeader.FIRST_CHUNK_INDEX - 1;
private boolean sealed = false;
private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
- SnapshotTracker(LoggingAdapter LOG, int totalChunks){
+ SnapshotTracker(Logger LOG, int totalChunks){
this.LOG = LOG;
this.totalChunks = totalChunks;
}
}
public static class InvalidChunkException extends Exception {
+ private static final long serialVersionUID = 1L;
+
InvalidChunkException(String message){
super(message);
}
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import com.google.common.base.Preconditions;
import com.google.protobuf.GeneratedMessage;
import java.io.Serializable;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MockRaftActorContext implements RaftActorContext {
public void initReplicatedLog(){
this.replicatedLog = new SimpleReplicatedLog();
- this.replicatedLog.append(new MockReplicatedLogEntry(1, 1, new MockPayload("")));
+ this.replicatedLog.append(new MockReplicatedLogEntry(1, 0, new MockPayload("1")));
+ this.replicatedLog.append(new MockReplicatedLogEntry(1, 1, new MockPayload("2")));
}
@Override public ActorRef actorOf(Props props) {
return this.system;
}
- @Override public LoggingAdapter getLogger() {
- return Logging.getLogger(system, this);
+ @Override public Logger getLogger() {
+ return LoggerFactory.getLogger(getClass());
}
@Override public Map<String, String> getPeerAddresses() {
package org.opendaylight.controller.cluster.raft;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
public class RaftActorTest extends AbstractActorTest {
// simulate a real snapshot
leaderActor.onReceiveCommand(new InitiateInstallSnapshot());
assertEquals(5, leaderActor.getReplicatedLog().size());
- assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+ assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
+ leaderActor.getCurrentBehavior().state(),leaderActor.getLeaderId())
+ , RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
//reply from a slow follower does not initiate a fake snapshot
leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 9, 1));
package org.opendaylight.controller.cluster.raft.behaviors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
+import org.slf4j.impl.SimpleLogger;
import scala.concurrent.duration.FiniteDuration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
public class LeaderTest extends AbstractRaftActorBehaviorTest {
+ static {
+ // This enables trace logging for the tests.
+ System.setProperty(SimpleLogger.LOG_KEY_PREFIX + MockRaftActorContext.class.getName(), "trace");
+ }
+
private final ActorRef leaderActor =
getSystem().actorOf(Props.create(DoNothingActor.class));
private final ActorRef senderActor =
@Test
public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
new JavaTestKit(getSystem()) {{
-
new Within(duration("1 seconds")) {
@Override
protected void run() {
-
ActorRef followerActor = getTestActor();
MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ String followerId = "follower";
+ peerAddresses.put(followerId, followerActor.path().toString());
actorContext.setPeerAddresses(peerAddresses);
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
Leader leader = new Leader(actorContext);
- leader.markFollowerActive(followerActor.path().toString());
- Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
- TimeUnit.MILLISECONDS);
- leader.handleMessage(senderActor, new SendHeartBeat());
- final String out =
- new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- Object msg = fromSerializableMessage(in);
- if (msg instanceof AppendEntries) {
- if (((AppendEntries)msg).getTerm() == 0) {
- return "match";
- }
- return null;
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
+ // Leader should send an immediate heartbeat with no entries as follower is inactive.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
+ assertEquals("getTerm", term, appendEntries.getTerm());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 0, appendEntries.getEntries().size());
- assertEquals("match", out);
+ // The follower would normally reply - simulate that explicitly here.
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ followerId, term, true, lastIndex - 1, term));
+ assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
+
+ // Sleep for the heartbeat interval so AppendEntries is sent.
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
+ getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+ leader.handleMessage(senderActor, new SendHeartBeat());
+
+ appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
+ assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
}
};
}};
@Test
public void testHandleReplicateMessageSendAppendEntriesToFollower() {
new JavaTestKit(getSystem()) {{
-
new Within(duration("1 seconds")) {
@Override
protected void run() {
-
ActorRef followerActor = getTestActor();
- MockRaftActorContext actorContext =
- (MockRaftActorContext) createActorContext();
+ MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ String followerId = "follower";
+ peerAddresses.put(followerId, followerActor.path().toString());
actorContext.setPeerAddresses(peerAddresses);
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
Leader leader = new Leader(actorContext);
- leader.markFollowerActive(followerActor.path().toString());
- Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
- TimeUnit.MILLISECONDS);
- RaftActorBehavior raftBehavior = leader
- .handleMessage(senderActor, new Replicate(null, null,
- new MockRaftActorContext.MockReplicatedLogEntry(1,
- 100,
- new MockRaftActorContext.MockPayload("foo"))
- ));
+
+ // Leader will send an immediate heartbeat - ignore it.
+ expectMsgClass(duration("5 seconds"), AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ followerId, term, true, lastIndex, term));
+ assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
+
+ MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+ MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+ 1, lastIndex + 1, payload);
+ actorContext.getReplicatedLog().append(newEntry);
+ RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
+ new Replicate(null, null, newEntry));
// State should not change
assertTrue(raftBehavior instanceof Leader);
- final String out =
- new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- Object msg = fromSerializableMessage(in);
- if (msg instanceof AppendEntries) {
- if (((AppendEntries)msg).getTerm() == 0) {
- return "match";
- }
- return null;
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("match", out);
+ AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
+ assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
}
};
}};
@Test
public void testHandleReplicateMessageWhenThereAreNoFollowers() {
new JavaTestKit(getSystem()) {{
-
new Within(duration("1 seconds")) {
@Override
protected void run() {
leader.handleMessage(leaderActor, new SendHeartBeat());
- AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
+ AppendEntries aeproto = MessageCollectorActor.getFirstMatching(
followerActor, AppendEntries.class);
assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
leader.handleMessage(senderActor, new SendHeartBeat());
- InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
- MessageCollectorActor.getFirstMatching(followerActor,
- InstallSnapshot.SERIALIZABLE_CLASS);
+ InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.getFirstMatching(followerActor,
+ InstallSnapshot.SERIALIZABLE_CLASS);
assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
isproto);
RaftActorBehavior raftBehavior = leader.handleMessage(
leaderActor, new InitiateInstallSnapshot());
- CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
+ CaptureSnapshot cs = MessageCollectorActor.
getFirstMatching(leaderActor, CaptureSnapshot.class);
assertNotNull(cs);
Leader leader = new Leader(actorContext);
+ // Ignore initial heartbeat.
+ expectMsgClass(duration("5 seconds"), AppendEntries.class);
+
// new entry
ReplicatedLogImplEntry entry =
new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
MockLeader leader = new MockLeader(actorContext);
+ // Ignore initial heartbeat.
+ expectMsgClass(duration("5 seconds"), AppendEntries.class);
+
Map<String, String> leadersSnapshot = new HashMap<>();
leadersSnapshot.put("1", "A");
leadersSnapshot.put("2", "B");
leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
- Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+ MessageCollectorActor.getAllMatching(followerActor,
+ InstallSnapshotMessages.InstallSnapshot.class);
- assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
-
- InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
+ followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+ assertNotNull(installSnapshot);
assertEquals(1, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
+ followerActor.underlyingActor().clear();
leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
followerActor.path().toString(), -1, false));
leader.handleMessage(leaderActor, new SendHeartBeat());
- o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1);
-
- assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
-
- installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+ installSnapshot = MessageCollectorActor.getFirstMatching(
+ followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+ assertNotNull(installSnapshot);
assertEquals(1, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
new JavaTestKit(getSystem()) {
{
-
TestActorRef<MessageCollectorActor> followerActor =
TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
- Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
-
- assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
-
- InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
+ followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+ assertNotNull(installSnapshot);
assertEquals(1, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
int hashCode = installSnapshot.getData().hashCode();
- leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
-
- Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
-
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ followerActor.underlyingActor().clear();
- o = MessageCollectorActor.getAllMessages(followerActor).get(1);
-
- assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+ leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
- installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+ installSnapshot = MessageCollectorActor.getFirstMatching(
+ followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+ assertNotNull(installSnapshot);
assertEquals(2, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
@Override
protected RaftActorContext createActorContext(ActorRef actorRef) {
- return new MockRaftActorContext("test", getSystem(), actorRef);
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
+ configParams.setElectionTimeoutFactor(100000);
+ MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), actorRef);
+ context.setConfigParams(configParams);
+ return context;
}
private ByteString toByteString(Map<String, String> state) {
}
public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
- private static AbstractRaftActorBehavior behavior;
-
- public ForwardMessageToBehaviorActor(){
-
- }
+ AbstractRaftActorBehavior behavior;
@Override public void onReceive(Object message) throws Exception {
+ if(behavior != null) {
+ behavior.handleMessage(sender(), message);
+ }
+
super.onReceive(message);
- behavior.handleMessage(sender(), message);
}
- public static void setBehavior(AbstractRaftActorBehavior behavior){
- ForwardMessageToBehaviorActor.behavior = behavior;
+ public static Props props() {
+ return Props.create(ForwardMessageToBehaviorActor.class);
}
}
@Test
public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
new JavaTestKit(getSystem()) {{
-
- ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
+ Props.create(ForwardMessageToBehaviorActor.class));
MockRaftActorContext leaderActorContext =
- new MockRaftActorContext("leader", getSystem(), leaderActor);
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
- ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+ TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
+ ForwardMessageToBehaviorActor.props());
MockRaftActorContext followerActorContext =
- new MockRaftActorContext("follower", getSystem(), followerActor);
+ new MockRaftActorContext("follower", getSystem(), followerActor);
Follower follower = new Follower(followerActorContext);
-
- ForwardMessageToBehaviorActor.setBehavior(follower);
+ followerActor.underlyingActor().behavior = follower;
Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ peerAddresses.put("follower", followerActor.path().toString());
leaderActorContext.setPeerAddresses(peerAddresses);
//create 3 entries
leaderActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
leaderActorContext.setCommitIndex(1);
// follower too has the exact same log entries and has the same commit index
followerActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
followerActorContext.setCommitIndex(1);
Leader leader = new Leader(leaderActorContext);
- leader.markFollowerActive(followerActor.path().toString());
-
- Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
- TimeUnit.MILLISECONDS);
-
- leader.handleMessage(leaderActor, new SendHeartBeat());
-
- AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
- .getFirstMatching(followerActor, AppendEntries.class);
+ AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
assertNotNull(appendEntries);
assertEquals(1, appendEntries.getLeaderCommit());
- assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals(0, appendEntries.getEntries().size());
assertEquals(0, appendEntries.getPrevLogIndex());
- AppendEntriesReply appendEntriesReply =
- (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
leaderActor, AppendEntriesReply.class);
-
assertNotNull(appendEntriesReply);
- // follower returns its next index
assertEquals(2, appendEntriesReply.getLogLastIndex());
assertEquals(1, appendEntriesReply.getLogLastTerm());
+ // follower returns its next index
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
}};
}
@Test
public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
new JavaTestKit(getSystem()) {{
-
- ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
+ Props.create(ForwardMessageToBehaviorActor.class));
MockRaftActorContext leaderActorContext =
- new MockRaftActorContext("leader", getSystem(), leaderActor);
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
- ActorRef followerActor = getSystem().actorOf(
- Props.create(ForwardMessageToBehaviorActor.class));
+ TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
+ ForwardMessageToBehaviorActor.props());
MockRaftActorContext followerActorContext =
- new MockRaftActorContext("follower", getSystem(), followerActor);
+ new MockRaftActorContext("follower", getSystem(), followerActor);
Follower follower = new Follower(followerActorContext);
-
- ForwardMessageToBehaviorActor.setBehavior(follower);
+ followerActor.underlyingActor().behavior = follower;
Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ peerAddresses.put("follower", followerActor.path().toString());
leaderActorContext.setPeerAddresses(peerAddresses);
leaderActorContext.getReplicatedLog().removeFrom(0);
leaderActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
leaderActorContext.setCommitIndex(1);
followerActorContext.getReplicatedLog().removeFrom(0);
followerActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
// follower has the same log entries but its commit index > leaders commit index
followerActorContext.setCommitIndex(2);
Leader leader = new Leader(leaderActorContext);
- leader.markFollowerActive(followerActor.path().toString());
-
- Thread.sleep(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis());
-
- leader.handleMessage(leaderActor, new SendHeartBeat());
-
- AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
- .getFirstMatching(followerActor, AppendEntries.class);
+ // Initial heartbeat
+ AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
assertNotNull(appendEntries);
assertEquals(1, appendEntries.getLeaderCommit());
- assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals(0, appendEntries.getEntries().size());
assertEquals(0, appendEntries.getPrevLogIndex());
- AppendEntriesReply appendEntriesReply =
- (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
leaderActor, AppendEntriesReply.class);
+ assertNotNull(appendEntriesReply);
+
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+ leaderActor.underlyingActor().behavior = leader;
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ leaderActor.underlyingActor().clear();
+ followerActor.underlyingActor().clear();
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
+ assertNotNull(appendEntries);
+
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(0, appendEntries.getEntries().size());
+ assertEquals(2, appendEntries.getPrevLogIndex());
+
+ appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
assertNotNull(appendEntriesReply);
assertEquals(2, appendEntriesReply.getLogLastIndex());
assertEquals(1, appendEntriesReply.getLogLastTerm());
+ assertEquals(1, followerActorContext.getCommitIndex());
}};
}
assertEquals(2, leaderActorContext.getCommitIndex());
ApplyLogEntries applyLogEntries =
- (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
- ApplyLogEntries.class);
+ MessageCollectorActor.getFirstMatching(leaderActor,
+ ApplyLogEntries.class);
assertNotNull(applyLogEntries);
@Test
public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
new JavaTestKit(getSystem()) {{
-
- ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ TestActorRef<MessageCollectorActor> leaderActor = TestActorRef.create(getSystem(),
+ Props.create(MessageCollectorActor.class));
MockRaftActorContext leaderActorContext =
- new MockRaftActorContext("leader", getSystem(), leaderActor);
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
- configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+ //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
leaderActorContext.setConfigParams(configParams);
- ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+ TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
+ ForwardMessageToBehaviorActor.props());
MockRaftActorContext followerActorContext =
- new MockRaftActorContext("follower-reply", getSystem(), followerActor);
+ new MockRaftActorContext("follower-reply", getSystem(), followerActor);
followerActorContext.setConfigParams(configParams);
Follower follower = new Follower(followerActorContext);
-
- ForwardMessageToBehaviorActor.setBehavior(follower);
+ followerActor.underlyingActor().behavior = follower;
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put("follower-reply",
- followerActor.path().toString());
+ followerActor.path().toString());
leaderActorContext.setPeerAddresses(peerAddresses);
leaderActorContext.getReplicatedLog().removeFrom(0);
+ leaderActorContext.setCommitIndex(-1);
+ leaderActorContext.setLastApplied(-1);
- //create 3 entries
- leaderActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
-
- leaderActorContext.setCommitIndex(1);
+ followerActorContext.getReplicatedLog().removeFrom(0);
+ followerActorContext.setCommitIndex(-1);
+ followerActorContext.setLastApplied(-1);
Leader leader = new Leader(leaderActorContext);
- leader.markFollowerActive("follower-reply");
+
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
+ leaderActor, AppendEntriesReply.class);
+ assertNotNull(appendEntriesReply);
+ System.out.println("appendEntriesReply: "+appendEntriesReply);
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ // Clear initial heartbeat messages
+
+ leaderActor.underlyingActor().clear();
+ followerActor.underlyingActor().clear();
+
+ // create 3 entries
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ leaderActorContext.setCommitIndex(1);
+ leaderActorContext.setLastApplied(1);
Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
- TimeUnit.MILLISECONDS);
+ TimeUnit.MILLISECONDS);
leader.handleMessage(leaderActor, new SendHeartBeat());
- AppendEntries appendEntries = (AppendEntries) ForwardMessageToBehaviorActor
- .getFirstMatching(followerActor, AppendEntries.class);
-
+ AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
assertNotNull(appendEntries);
+ // Should send first log entry
assertEquals(1, appendEntries.getLeaderCommit());
- assertEquals(1, appendEntries.getEntries().get(0).getIndex());
- assertEquals(0, appendEntries.getPrevLogIndex());
-
- AppendEntriesReply appendEntriesReply =
- (AppendEntriesReply)ForwardMessageToBehaviorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
+ assertEquals(0, appendEntries.getEntries().get(0).getIndex());
+ assertEquals(-1, appendEntries.getPrevLogIndex());
+ appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
assertNotNull(appendEntriesReply);
- leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
-
- List<Object> entries = ForwardMessageToBehaviorActor
- .getAllMatching(followerActor, AppendEntries.class);
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+ assertEquals(0, appendEntriesReply.getLogLastIndex());
- assertEquals("AppendEntries count should be 2 ", 2, entries.size());
+ followerActor.underlyingActor().clear();
- AppendEntries appendEntriesSecond = (AppendEntries) entries.get(1);
+ leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
- assertEquals(1, appendEntriesSecond.getLeaderCommit());
- assertEquals(2, appendEntriesSecond.getEntries().get(0).getIndex());
- assertEquals(1, appendEntriesSecond.getPrevLogIndex());
+ appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
+ assertNotNull(appendEntries);
+ // Should send second log entry
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(1, appendEntries.getEntries().get(0).getIndex());
}};
}
package org.opendaylight.controller.cluster.raft.behaviors;
import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import akka.event.LoggingAdapter;
import com.google.common.base.Optional;
import com.google.protobuf.ByteString;
import java.io.ByteArrayOutputStream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SnapshotTrackerTest {
+ Logger logger = LoggerFactory.getLogger(getClass());
+
Map<String, String> data;
ByteString byteString;
ByteString chunk1;
@Test
public void testAddChunk() throws SnapshotTracker.InvalidChunkException {
- SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+ SnapshotTracker tracker1 = new SnapshotTracker(logger, 5);
tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
tracker1.addChunk(2, chunk2, Optional.<Integer>absent());
tracker1.addChunk(3, chunk3, Optional.<Integer>absent());
// Verify that an InvalidChunkException is thrown when we try to add a chunk to a sealed tracker
- SnapshotTracker tracker2 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+ SnapshotTracker tracker2 = new SnapshotTracker(logger, 2);
tracker2.addChunk(1, chunk1, Optional.<Integer>absent());
tracker2.addChunk(2, chunk2, Optional.<Integer>absent());
}
// The first chunk's index must at least be FIRST_CHUNK_INDEX
- SnapshotTracker tracker3 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+ SnapshotTracker tracker3 = new SnapshotTracker(logger, 2);
try {
tracker3.addChunk(AbstractLeader.FIRST_CHUNK_INDEX - 1, chunk1, Optional.<Integer>absent());
}
// Out of sequence chunk indexes won't work
- SnapshotTracker tracker4 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+ SnapshotTracker tracker4 = new SnapshotTracker(logger, 2);
tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
// No exceptions will be thrown when invalid chunk is added with the right sequence
// If the lastChunkHashCode is missing
- SnapshotTracker tracker5 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+ SnapshotTracker tracker5 = new SnapshotTracker(logger, 2);
tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
// Look I can add the same chunk again
// An exception will be thrown when an invalid chunk is addedd with the right sequence
// when the lastChunkHashCode is present
- SnapshotTracker tracker6 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+ SnapshotTracker tracker6 = new SnapshotTracker(logger, 2);
tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.of(-1));
public void testGetSnapShot() throws SnapshotTracker.InvalidChunkException {
// Trying to get a snapshot before all chunks have been received will throw an exception
- SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+ SnapshotTracker tracker1 = new SnapshotTracker(logger, 5);
tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
try {
}
- SnapshotTracker tracker2 = new SnapshotTracker(mock(LoggingAdapter.class), 3);
+ SnapshotTracker tracker2 = new SnapshotTracker(logger, 3);
tracker2.addChunk(1, chunk1, Optional.<Integer>absent());
tracker2.addChunk(2, chunk2, Optional.<Integer>absent());
@Test
public void testGetCollectedChunks() throws SnapshotTracker.InvalidChunkException {
- SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+ SnapshotTracker tracker1 = new SnapshotTracker(logger, 5);
ByteString chunks = chunk1.concat(chunk2);
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class MessageCollectorActor extends UntypedActor {
- private List<Object> messages = new ArrayList<>();
+ private final List<Object> messages = new ArrayList<>();
@Override public void onReceive(Object message) throws Exception {
if(message instanceof String){
}
}
+ public void clear() {
+ messages.clear();
+ }
+
public static List<Object> getAllMessages(ActorRef actor) throws Exception {
FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
Timeout operationTimeout = new Timeout(operationDuration);
* @param clazz
* @return
*/
- public static Object getFirstMatching(ActorRef actor, Class<?> clazz) throws Exception {
- List<Object> allMessages = getAllMessages(actor);
+ public static <T> T getFirstMatching(ActorRef actor, Class<T> clazz) throws Exception {
+ for(int i = 0; i < 50; i++) {
+ List<Object> allMessages = getAllMessages(actor);
- for(Object message : allMessages){
- if(message.getClass().equals(clazz)){
- return message;
+ for(Object message : allMessages){
+ if(message.getClass().equals(clazz)){
+ return (T) message;
+ }
}
+
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
}
return null;
package org.opendaylight.controller.cluster.common.actor;
import akka.actor.UntypedActor;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class AbstractUntypedActor extends UntypedActor {
- protected final LoggingAdapter LOG =
- Logging.getLogger(getContext().system(), this);
+ protected final Logger LOG = LoggerFactory.getLogger(getClass());
public AbstractUntypedActor() {
if(LOG.isDebugEnabled()) {
package org.opendaylight.controller.cluster.common.actor;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import akka.japi.Procedure;
import akka.persistence.SnapshotSelectionCriteria;
import akka.persistence.UntypedPersistentActor;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class AbstractUntypedPersistentActor extends UntypedPersistentActor {
- protected final LoggingAdapter LOG =
- Logging.getLogger(getContext().system(), this);
+ protected final Logger LOG = LoggerFactory.getLogger(getClass());
public AbstractUntypedPersistentActor() {
if(LOG.isDebugEnabled()) {
try {
procedure.apply(o);
} catch (Exception e) {
- LOG.error(e, "An unexpected error occurred");
+ LOG.error("An unexpected error occurred", e);
}
}
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-data-impl</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+
</dependencies>
<build>
import akka.util.Timeout;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.text.WordUtils;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationReader;
import org.opendaylight.controller.cluster.datastore.config.FileConfigurationReader;
import org.opendaylight.controller.cluster.raft.ConfigParams;
*/
public class DatastoreContext {
- private final InMemoryDOMDataStoreConfigProperties dataStoreProperties;
- private final Duration shardTransactionIdleTimeout;
- private final int operationTimeoutInSeconds;
- private final String dataStoreMXBeanType;
- private final ConfigParams shardRaftConfig;
- private final int shardTransactionCommitTimeoutInSeconds;
- private final int shardTransactionCommitQueueCapacity;
- private final Timeout shardInitializationTimeout;
- private final Timeout shardLeaderElectionTimeout;
- private final boolean persistent;
- private final ConfigurationReader configurationReader;
- private final long shardElectionTimeoutFactor;
-
- private DatastoreContext(InMemoryDOMDataStoreConfigProperties dataStoreProperties,
- ConfigParams shardRaftConfig, String dataStoreMXBeanType, int operationTimeoutInSeconds,
- Duration shardTransactionIdleTimeout, int shardTransactionCommitTimeoutInSeconds,
- int shardTransactionCommitQueueCapacity, Timeout shardInitializationTimeout,
- Timeout shardLeaderElectionTimeout,
- boolean persistent, ConfigurationReader configurationReader, long shardElectionTimeoutFactor) {
- this.dataStoreProperties = dataStoreProperties;
- this.shardRaftConfig = shardRaftConfig;
- this.dataStoreMXBeanType = dataStoreMXBeanType;
- this.operationTimeoutInSeconds = operationTimeoutInSeconds;
- this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
- this.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
- this.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
- this.shardInitializationTimeout = shardInitializationTimeout;
- this.shardLeaderElectionTimeout = shardLeaderElectionTimeout;
- this.persistent = persistent;
- this.configurationReader = configurationReader;
- this.shardElectionTimeoutFactor = shardElectionTimeoutFactor;
+ public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES);
+ public static final int DEFAULT_OPERATION_TIMEOUT_IN_SECONDS = 5;
+ public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
+ public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1000;
+ public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
+ public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
+ public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS = DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
+ public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 20000;
+ public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
+ public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
+ public static final boolean DEFAULT_PERSISTENT = true;
+ public static final FileConfigurationReader DEFAULT_CONFIGURATION_READER = new FileConfigurationReader();
+ public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
+ public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
+ public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
+ public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
+
+ private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
+ private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
+ private int operationTimeoutInSeconds = DEFAULT_OPERATION_TIMEOUT_IN_SECONDS;
+ private String dataStoreMXBeanType;
+ private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
+ private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
+ private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
+ private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
+ private boolean persistent = DEFAULT_PERSISTENT;
+ private ConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
+ private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
+ private DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
+ private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
+
+ private DatastoreContext(){
+ setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
+ setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
+ setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
+ setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
+ setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
}
public static Builder newBuilder() {
}
public ConfigParams getShardRaftConfig() {
- return shardRaftConfig;
+ return raftConfig;
}
public int getShardTransactionCommitTimeoutInSeconds() {
}
public long getShardElectionTimeoutFactor(){
- return this.shardElectionTimeoutFactor;
+ return raftConfig.getElectionTimeoutFactor();
+ }
+
+ public String getDataStoreType(){
+ return dataStoreType;
+ }
+
+ public long getTransactionCreationInitialRateLimit() {
+ return transactionCreationInitialRateLimit;
+ }
+
+ private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis){
+ raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
+ TimeUnit.MILLISECONDS));
+ }
+
+ private void setShardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize){
+ raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
+ }
+
+
+ private void setIsolatedLeaderCheckInterval(long shardIsolatedLeaderCheckIntervalInMillis) {
+ raftConfig.setIsolatedLeaderCheckInterval(
+ new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
+ }
+
+ private void setElectionTimeoutFactor(long shardElectionTimeoutFactor) {
+ raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
+ }
+
+ private void setSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
+ raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
+ }
+
+ private void setSnapshotBatchCount(int shardSnapshotBatchCount) {
+ raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
}
public static class Builder {
- private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
- private Duration shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES);
- private int operationTimeoutInSeconds = 5;
- private String dataStoreMXBeanType;
- private int shardTransactionCommitTimeoutInSeconds = 30;
- private int shardJournalRecoveryLogBatchSize = 1000;
- private int shardSnapshotBatchCount = 20000;
- private int shardHeartbeatIntervalInMillis = 500;
- private int shardTransactionCommitQueueCapacity = 20000;
- private Timeout shardInitializationTimeout = new Timeout(5, TimeUnit.MINUTES);
- private Timeout shardLeaderElectionTimeout = new Timeout(30, TimeUnit.SECONDS);
- private boolean persistent = true;
- private ConfigurationReader configurationReader = new FileConfigurationReader();
- private int shardIsolatedLeaderCheckIntervalInMillis = shardHeartbeatIntervalInMillis * 10;
- private int shardSnapshotDataThresholdPercentage = 12;
- private long shardElectionTimeoutFactor = 2;
+ private DatastoreContext datastoreContext = new DatastoreContext();
public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
- this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
+ datastoreContext.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
return this;
}
public Builder operationTimeoutInSeconds(int operationTimeoutInSeconds) {
- this.operationTimeoutInSeconds = operationTimeoutInSeconds;
+ datastoreContext.operationTimeoutInSeconds = operationTimeoutInSeconds;
return this;
}
public Builder dataStoreMXBeanType(String dataStoreMXBeanType) {
- this.dataStoreMXBeanType = dataStoreMXBeanType;
+ datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
return this;
}
public Builder dataStoreProperties(InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
- this.dataStoreProperties = dataStoreProperties;
+ datastoreContext.dataStoreProperties = dataStoreProperties;
return this;
}
public Builder shardTransactionCommitTimeoutInSeconds(int shardTransactionCommitTimeoutInSeconds) {
- this.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
+ datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
return this;
}
public Builder shardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) {
- this.shardJournalRecoveryLogBatchSize = shardJournalRecoveryLogBatchSize;
+ datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
return this;
}
public Builder shardSnapshotBatchCount(int shardSnapshotBatchCount) {
- this.shardSnapshotBatchCount = shardSnapshotBatchCount;
+ datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
return this;
}
public Builder shardSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
- this.shardSnapshotDataThresholdPercentage = shardSnapshotDataThresholdPercentage;
+ datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
return this;
}
-
public Builder shardHeartbeatIntervalInMillis(int shardHeartbeatIntervalInMillis) {
- this.shardHeartbeatIntervalInMillis = shardHeartbeatIntervalInMillis;
+ datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
return this;
}
public Builder shardTransactionCommitQueueCapacity(int shardTransactionCommitQueueCapacity) {
- this.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
+ datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
return this;
}
public Builder shardInitializationTimeout(long timeout, TimeUnit unit) {
- this.shardInitializationTimeout = new Timeout(timeout, unit);
+ datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
return this;
}
public Builder shardLeaderElectionTimeout(long timeout, TimeUnit unit) {
- this.shardLeaderElectionTimeout = new Timeout(timeout, unit);
+ datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
return this;
}
public Builder configurationReader(ConfigurationReader configurationReader){
- this.configurationReader = configurationReader;
+ datastoreContext.configurationReader = configurationReader;
return this;
}
public Builder persistent(boolean persistent){
- this.persistent = persistent;
+ datastoreContext.persistent = persistent;
return this;
}
public Builder shardIsolatedLeaderCheckIntervalInMillis(int shardIsolatedLeaderCheckIntervalInMillis) {
- this.shardIsolatedLeaderCheckIntervalInMillis = shardIsolatedLeaderCheckIntervalInMillis;
+ datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
return this;
}
public Builder shardElectionTimeoutFactor(long shardElectionTimeoutFactor){
- this.shardElectionTimeoutFactor = shardElectionTimeoutFactor;
+ datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
return this;
}
+ public Builder transactionCreationInitialRateLimit(long initialRateLimit){
+ datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
+ return this;
+ }
- public DatastoreContext build() {
- DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
- raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
- TimeUnit.MILLISECONDS));
- raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
- raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
- raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
- raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
- raftConfig.setIsolatedLeaderCheckInterval(
- new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
+ public Builder dataStoreType(String dataStoreType){
+ datastoreContext.dataStoreType = dataStoreType;
+ datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreType) + "Datastore";
+ return this;
+ }
- return new DatastoreContext(dataStoreProperties, raftConfig, dataStoreMXBeanType,
- operationTimeoutInSeconds, shardTransactionIdleTimeout,
- shardTransactionCommitTimeoutInSeconds, shardTransactionCommitQueueCapacity,
- shardInitializationTimeout, shardLeaderElectionTimeout,
- persistent, configurationReader, shardElectionTimeoutFactor);
+ public DatastoreContext build() {
+ return datastoreContext;
}
}
}
private final ActorContext actorContext;
- public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster,
+ public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster,
Configuration configuration, DatastoreContext datastoreContext) {
Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
- Preconditions.checkNotNull(type, "type should not be null");
Preconditions.checkNotNull(cluster, "cluster should not be null");
Preconditions.checkNotNull(configuration, "configuration should not be null");
Preconditions.checkNotNull(datastoreContext, "datastoreContext should not be null");
+ String type = datastoreContext.getDataStoreType();
+
String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString();
LOG.info("Creating ShardManager : {}", shardManagerId);
actorContext = new ActorContext(actorSystem, actorSystem.actorOf(
- ShardManager.props(type, cluster, configuration, datastoreContext)
+ ShardManager.props(cluster, configuration, datastoreContext)
.withMailbox(ActorContext.MAILBOX), shardManagerId ),
cluster, configuration, datastoreContext);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+ actorContext.acquireTxCreationPermit();
return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+ actorContext.acquireTxCreationPermit();
return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
}
private static volatile ActorSystem persistentActorSystem = null;
- public static DistributedDataStore createInstance(String name, SchemaService schemaService,
+ public static DistributedDataStore createInstance(SchemaService schemaService,
DatastoreContext datastoreContext, BundleContext bundleContext) {
ActorSystem actorSystem = getOrCreateInstance(bundleContext, datastoreContext.getConfigurationReader());
Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
final DistributedDataStore dataStore =
- new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
+ new DistributedDataStore(actorSystem, new ClusterWrapperImpl(actorSystem),
config, datastoreContext);
ShardStrategyFactory.setConfiguration(config);
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import akka.actor.Props;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import akka.japi.Creator;
import akka.persistence.RecoveryFailure;
import akka.serialization.Serialization;
// The state of this Shard
private final InMemoryDOMDataStore store;
- private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this);
-
/// The name of this shard
private final ShardIdentifier name;
}
if (message instanceof RecoveryFailure){
- LOG.error(((RecoveryFailure) message).cause(), "{}: Recovery failed because of this cause",
- persistenceId());
+ LOG.error("{}: Recovery failed because of this cause",
+ persistenceId(), ((RecoveryFailure) message).cause());
// Even though recovery failed, we still need to finish our recovery, eg send the
// ActorInitialized message and start the txCommitTimeoutCheckSchedule.
if(cohortEntry != null) {
long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
if(elapsed > transactionCommitTimeout) {
- LOG.warning("{}: Current transaction {} has timed out after {} ms - aborting",
+ LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting",
persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
doAbortTransaction(cohortEntry.getTransactionID(), null);
new ModificationPayload(cohortEntry.getModification()));
}
} catch (Exception e) {
- LOG.error(e, "{} An exception occurred while preCommitting transaction {}",
- persistenceId(), cohortEntry.getTransactionID());
+ LOG.error("{} An exception occurred while preCommitting transaction {}",
+ persistenceId(), cohortEntry.getTransactionID(), e);
shardMBean.incrementFailedTransactionsCount();
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
} catch (Exception e) {
sender.tell(new akka.actor.Status.Failure(e), getSelf());
- LOG.error(e, "{}, An exception occurred while committing transaction {}", persistenceId(), transactionID);
+ LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(),
+ transactionID, e);
shardMBean.incrementFailedTransactionsCount();
} finally {
commitCoordinator.currentTransactionComplete(transactionID, true);
@Override
public void onFailure(final Throwable t) {
- LOG.error(t, "{}: An exception happened during abort", persistenceId());
+ LOG.error("{}: An exception happened during abort", persistenceId(), t);
if(sender != null) {
sender.tell(new akka.actor.Status.Failure(t), self);
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
} catch (InterruptedException | ExecutionException e) {
shardMBean.incrementFailedTransactionsCount();
- LOG.error(e, "{}: Failed to commit", persistenceId());
+ LOG.error("{}: Failed to commit", persistenceId(), e);
}
}
try {
currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
} catch (ClassNotFoundException | IOException e) {
- LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId());
+ LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
}
} else if (data instanceof CompositeModificationPayload) {
currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
shardMBean.incrementCommittedTransactionCount();
} catch (InterruptedException | ExecutionException e) {
shardMBean.incrementFailedTransactionsCount();
- LOG.error(e, "{}: Failed to commit", persistenceId());
+ LOG.error("{}: Failed to commit", persistenceId(), e);
}
}
}
try {
applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
} catch (ClassNotFoundException | IOException e) {
- LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId());
+ LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
}
}
else if (data instanceof CompositeModificationPayload) {
transaction.write(DATASTORE_ROOT, node);
syncCommitTransaction(transaction);
} catch (InterruptedException | ExecutionException e) {
- LOG.error(e, "{}: An exception occurred when applying snapshot", persistenceId());
+ LOG.error("{}: An exception occurred when applying snapshot", persistenceId(), e);
} finally {
LOG.info("{}: Done applying snapshot", persistenceId());
}
import akka.actor.ActorRef;
import akka.actor.Status;
-import akka.event.LoggingAdapter;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.LinkedList;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.slf4j.Logger;
/**
* Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
private final int queueCapacity;
- private final LoggingAdapter log;
+ private final Logger log;
private final String name;
- public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, LoggingAdapter log,
+ public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, Logger log,
String name) {
cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.cluster.ClusterEvent;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import akka.japi.Creator;
import akka.japi.Function;
import akka.japi.Procedure;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
/**
*/
public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
- protected final LoggingAdapter LOG =
- Logging.getLogger(getContext().system(), this);
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
// Stores a mapping between a member name and the address of the member
// Member names look like "member-1", "member-2" etc and are as specified
private final DataPersistenceProvider dataPersistenceProvider;
/**
- * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
- * configuration or operational
*/
- protected ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
+ protected ShardManager(ClusterWrapper cluster, Configuration configuration,
DatastoreContext datastoreContext) {
- this.type = Preconditions.checkNotNull(type, "type should not be null");
this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
this.datastoreContext = datastoreContext;
this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
+ this.type = datastoreContext.getDataStoreType();
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider();
}
- public static Props props(final String type,
+ public static Props props(
final ClusterWrapper cluster,
final Configuration configuration,
final DatastoreContext datastoreContext) {
- Preconditions.checkNotNull(type, "type should not be null");
Preconditions.checkNotNull(cluster, "cluster should not be null");
Preconditions.checkNotNull(configuration, "configuration should not be null");
- return Props.create(new ShardManagerCreator(type, cluster, configuration, datastoreContext));
+ return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext));
}
@Override
knownModules = ImmutableSet.copyOf(msg.getModules());
} else if (message instanceof RecoveryFailure) {
RecoveryFailure failure = (RecoveryFailure) message;
- LOG.error(failure.cause(), "Recovery failed");
+ LOG.error("Recovery failed", failure.cause());
} else if (message instanceof RecoveryCompleted) {
LOG.info("Recovery complete : {}", persistenceId());
new Function<Throwable, SupervisorStrategy.Directive>() {
@Override
public SupervisorStrategy.Directive apply(Throwable t) {
- StringBuilder sb = new StringBuilder();
- for(StackTraceElement element : t.getStackTrace()) {
- sb.append("\n\tat ")
- .append(element.toString());
- }
- LOG.warning("Supervisor Strategy of resume applied {}",sb.toString());
+ LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
return SupervisorStrategy.resume();
}
}
private static class ShardManagerCreator implements Creator<ShardManager> {
private static final long serialVersionUID = 1L;
- final String type;
final ClusterWrapper cluster;
final Configuration configuration;
final DatastoreContext datastoreContext;
- ShardManagerCreator(String type, ClusterWrapper cluster,
+ ShardManagerCreator(ClusterWrapper cluster,
Configuration configuration, DatastoreContext datastoreContext) {
- this.type = type;
this.cluster = cluster;
this.configuration = configuration;
this.datastoreContext = datastoreContext;
@Override
public ShardManager create() throws Exception {
- return new ShardManager(type, cluster, configuration, datastoreContext);
+ return new ShardManager(cluster, configuration, datastoreContext);
}
}
*/
package org.opendaylight.controller.cluster.datastore;
-import akka.event.LoggingAdapter;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collection;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
/**
* Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot
private final SchemaContext schemaContext;
private final String shardName;
private final ExecutorService executor;
- private final LoggingAdapter log;
+ private final Logger log;
private final String name;
- ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, LoggingAdapter log,
+ ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, Logger log,
String name) {
this.schemaContext = schemaContext;
this.shardName = shardName;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import org.opendaylight.controller.cluster.datastore.messages.Monitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TerminationMonitor extends UntypedActor{
- protected final LoggingAdapter LOG =
- Logging.getLogger(getContext().system(), this);
+ private static final Logger LOG = LoggerFactory.getLogger(TerminationMonitor.class);
public TerminationMonitor(){
- LOG.info("Created TerminationMonitor");
+ LOG.debug("Created TerminationMonitor");
}
@Override public void onReceive(Object message) throws Exception {
import akka.actor.ActorSelection;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
private final List<Future<ActorSelection>> cohortFutures;
private volatile List<ActorSelection> cohorts;
private final String transactionId;
+ private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() {
+ @Override
+ public void run() {
+ }
+
+ @Override
+ public void success() {
+ }
+
+ @Override
+ public void failure() {
+ }
+ };
public ThreePhaseCommitCohortProxy(ActorContext actorContext,
List<Future<ActorSelection>> cohortFutures, String transactionId) {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
}
-
- futureList.add(actorContext.executeOperationAsync(cohort, message));
+ futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
}
return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
@Override
public ListenableFuture<Void> commit() {
- return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
- CommitTransactionReply.SERIALIZABLE_CLASS, true);
+ OperationCallback operationCallback = (cohortFutures.size() == 0) ? NO_OP_CALLBACK :
+ new CommitCallback(actorContext);
+
+ return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
+ CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
+ }
+
+ private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
+ final Class<?> expectedResponseClass, final boolean propagateException) {
+ return voidOperation(operationName, message, expectedResponseClass, propagateException, NO_OP_CALLBACK);
}
private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
- final Class<?> expectedResponseClass, final boolean propagateException) {
+ final Class<?> expectedResponseClass, final boolean propagateException, final OperationCallback callback) {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} {}", transactionId, operationName);
if(cohorts != null) {
finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
- returnFuture);
+ returnFuture, callback);
} else {
buildCohortList().onComplete(new OnComplete<Void>() {
@Override
}
} else {
finishVoidOperation(operationName, message, expectedResponseClass,
- propagateException, returnFuture);
+ propagateException, returnFuture, callback);
}
}
}, actorContext.getActorSystem().dispatcher());
}
private void finishVoidOperation(final String operationName, final Object message,
- final Class<?> expectedResponseClass, final boolean propagateException,
- final SettableFuture<Void> returnFuture) {
+ final Class<?> expectedResponseClass, final boolean propagateException,
+ final SettableFuture<Void> returnFuture, final OperationCallback callback) {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} finish {}", transactionId, operationName);
}
+
+ callback.run();
+
Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
}
if(exceptionToPropagate != null) {
+
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
operationName, exceptionToPropagate);
}
returnFuture.set(null);
}
+
+ callback.failure();
} else {
+
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
}
returnFuture.set(null);
+
+ callback.success();
}
}
}, actorContext.getActorSystem().dispatcher());
List<Future<ActorSelection>> getCohortFutures() {
return Collections.unmodifiableList(cohortFutures);
}
+
+ private static interface OperationCallback {
+ void run();
+ void success();
+ void failure();
+ }
+
+ private static class CommitCallback implements OperationCallback{
+
+ private static final Logger LOG = LoggerFactory.getLogger(CommitCallback.class);
+ private static final String COMMIT = "commit";
+
+ private final Timer commitTimer;
+ private final ActorContext actorContext;
+ private Timer.Context timerContext;
+
+ CommitCallback(ActorContext actorContext){
+ this.actorContext = actorContext;
+ commitTimer = actorContext.getOperationTimer(COMMIT);
+ }
+
+ @Override
+ public void run() {
+ timerContext = commitTimer.time();
+ }
+
+ @Override
+ public void success() {
+ timerContext.stop();
+
+ Snapshot timerSnapshot = commitTimer.getSnapshot();
+ double allowedLatencyInNanos = timerSnapshot.get98thPercentile();
+
+ long commitTimeoutInSeconds = actorContext.getDatastoreContext()
+ .getShardTransactionCommitTimeoutInSeconds();
+ long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds);
+
+ // Here we are trying to find out how many transactions per second are allowed
+ double newRateLimit = ((double) commitTimeoutInNanos / allowedLatencyInNanos) / commitTimeoutInSeconds;
+
+ LOG.debug("Data Store {} commit rateLimit adjusted to {} allowedLatencyInNanos = {}",
+ actorContext.getDataStoreType(), newRateLimit, allowedLatencyInNanos);
+
+ actorContext.setTxCreationLimit(newRateLimit);
+ }
+
+ @Override
+ public void failure() {
+ // This would mean we couldn't get a transaction completed in 30 seconds which is
+ // the default transaction commit timeout. Using the timeout information to figure out the rate limit is
+ // not going to be useful - so we leave it as it is
+ }
+ }
+
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+ actorContext.acquireTxCreationPermit();
return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+ actorContext.acquireTxCreationPermit();
return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY);
}
*/
package org.opendaylight.controller.cluster.datastore.compat;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.japi.Creator;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import akka.japi.Creator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An actor to maintain backwards compatibility for the base Helium version where the 3-phase commit
*/
public class BackwardsCompatibleThreePhaseCommitCohort extends AbstractUntypedActor {
- private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this);
+ private static final Logger LOG = LoggerFactory.getLogger(BackwardsCompatibleThreePhaseCommitCohort.class);
private final String transactionId;
import akka.dispatch.Mapper;
import akka.pattern.AskTimeoutException;
import akka.util.Timeout;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
* but should not be passed to actors especially remote actors
*/
public class ActorContext {
- private static final Logger
- LOG = LoggerFactory.getLogger(ActorContext.class);
-
- public static final String MAILBOX = "bounded-mailbox";
-
+ private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
+ private static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
+ private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
+ private static final String METRIC_RATE = "rate";
+ private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore";
private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
new Mapper<Throwable, Throwable>() {
@Override
return actualFailure;
}
};
+ public static final String MAILBOX = "bounded-mailbox";
private final ActorSystem actorSystem;
private final ActorRef shardManager;
private final ClusterWrapper clusterWrapper;
private final Configuration configuration;
private final DatastoreContext datastoreContext;
- private volatile SchemaContext schemaContext;
private final FiniteDuration operationDuration;
private final Timeout operationTimeout;
private final String selfAddressHostPort;
+ private final RateLimiter txRateLimiter;
+ private final MetricRegistry metricRegistry = new MetricRegistry();
+ private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
private final int transactionOutstandingOperationLimit;
+ private final Timeout transactionCommitOperationTimeout;
+
+ private volatile SchemaContext schemaContext;
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper, Configuration configuration) {
this.clusterWrapper = clusterWrapper;
this.configuration = configuration;
this.datastoreContext = datastoreContext;
+ this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
- operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(),
- TimeUnit.SECONDS);
+ operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
operationTimeout = new Timeout(operationDuration);
+ transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
+ TimeUnit.SECONDS));
+
Address selfAddress = clusterWrapper.getSelfAddress();
if (selfAddress != null && !selfAddress.host().isEmpty()) {
}
transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
+ jmxReporter.start();
}
public DatastoreContext getDatastoreContext() {
public int getTransactionOutstandingOperationLimit(){
return transactionOutstandingOperationLimit;
}
+
+ /**
+ * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
+ * us to create a timer for pretty much anything.
+ *
+ * @param operationName
+ * @return
+ */
+ public Timer getOperationTimer(String operationName){
+ final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, datastoreContext.getDataStoreType(), operationName, METRIC_RATE);
+ return metricRegistry.timer(rate);
+ }
+
+ /**
+ * Get the type of the data store to which this ActorContext belongs
+ *
+ * @return
+ */
+ public String getDataStoreType() {
+ return datastoreContext.getDataStoreType();
+ }
+
+ /**
+ * Set the number of transaction creation permits that are to be allowed
+ *
+ * @param permitsPerSecond
+ */
+ public void setTxCreationLimit(double permitsPerSecond){
+ txRateLimiter.setRate(permitsPerSecond);
+ }
+
+ /**
+ * Get the current transaction creation rate limit
+ * @return
+ */
+ public double getTxCreationLimit(){
+ return txRateLimiter.getRate();
+ }
+
+ /**
+ * Try to acquire a transaction creation permit. Will block if no permits are available.
+ */
+ public void acquireTxCreationPermit(){
+ txRateLimiter.acquire();
+ }
+
+ /**
+ * Return the operation timeout to be used when committing transactions
+ * @return
+ */
+ public Timeout getTransactionCommitOperationTimeout(){
+ return transactionCommitOperationTimeout;
+ }
+
+
}
}
DatastoreContext datastoreContext = DatastoreContext.newBuilder()
- .dataStoreMXBeanType("DistributedConfigDatastore")
+ .dataStoreType("config")
.dataStoreProperties(InMemoryDOMDataStoreConfigProperties.create(
props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
.shardIsolatedLeaderCheckIntervalInMillis(
props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
.shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
+ .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
.build();
- return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
+ return DistributedDataStoreFactory.createInstance(getConfigSchemaServiceDependency(),
datastoreContext, bundleContext);
}
}
DatastoreContext datastoreContext = DatastoreContext.newBuilder()
- .dataStoreMXBeanType("DistributedOperationalDatastore")
+ .dataStoreType("operational")
.dataStoreProperties(InMemoryDOMDataStoreConfigProperties.create(
props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
.shardIsolatedLeaderCheckIntervalInMillis(
props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
.shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
+ .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
.build();
- return DistributedDataStoreFactory.createInstance("operational",
- getOperationalSchemaServiceDependency(), datastoreContext, bundleContext);
+ return DistributedDataStoreFactory.createInstance(getOperationalSchemaServiceDependency(),
+ datastoreContext, bundleContext);
}
public void setBundleContext(BundleContext bundleContext) {
description "The interval at which the leader of the shard will check if its majority
followers are active and term itself as isolated";
}
+
+ leaf tx-creation-initial-rate-limit {
+ default 100;
+ type non-zero-uint32-type;
+ description "The initial number of transactions per second that are allowed before the data store
+ should begin applying back pressure. This number is only used as an initial guidance,
+ subsequently the datastore measures the latency for a commit and auto-adjusts the rate limit";
+ }
}
// Augments the 'configuration' choice node under modules/module.
--- /dev/null
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DatastoreContextTest {
+
+ private DatastoreContext.Builder builder;
+
+ @Before
+ public void setUp(){
+ builder = new DatastoreContext.Builder();
+ }
+
+ @Test
+ public void testDefaults(){
+ DatastoreContext build = builder.build();
+
+ assertEquals(DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT , build.getShardTransactionIdleTimeout());
+ assertEquals(DatastoreContext.DEFAULT_OPERATION_TIMEOUT_IN_SECONDS, build.getOperationTimeoutInSeconds());
+ assertEquals(DatastoreContext.DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS, build.getShardTransactionCommitTimeoutInSeconds());
+ assertEquals(DatastoreContext.DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE, build.getShardRaftConfig().getJournalRecoveryLogBatchSize());
+ assertEquals(DatastoreContext.DEFAULT_SNAPSHOT_BATCH_COUNT, build.getShardRaftConfig().getSnapshotBatchCount());
+ assertEquals(DatastoreContext.DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS, build.getShardRaftConfig().getHeartBeatInterval().length());
+ assertEquals(DatastoreContext.DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY, build.getShardTransactionCommitQueueCapacity());
+ assertEquals(DatastoreContext.DEFAULT_SHARD_INITIALIZATION_TIMEOUT, build.getShardInitializationTimeout());
+ assertEquals(DatastoreContext.DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT, build.getShardLeaderElectionTimeout());
+ assertEquals(DatastoreContext.DEFAULT_PERSISTENT, build.isPersistent());
+ assertEquals(DatastoreContext.DEFAULT_CONFIGURATION_READER, build.getConfigurationReader());
+ assertEquals(DatastoreContext.DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS, build.getShardRaftConfig().getIsolatedCheckInterval().length());
+ assertEquals(DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE, build.getShardRaftConfig().getSnapshotDataThresholdPercentage());
+ assertEquals(DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR, build.getShardRaftConfig().getElectionTimeoutFactor());
+ assertEquals(DatastoreContext.DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT, build.getTransactionCreationInitialRateLimit());
+ }
+
+}
\ No newline at end of file
Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
ShardStrategyFactory.setConfiguration(config);
+ datastoreContextBuilder.dataStoreType(typeName);
+
DatastoreContext datastoreContext = datastoreContextBuilder.build();
- DistributedDataStore dataStore = new DistributedDataStore(getSystem(), typeName, cluster,
+
+ DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
config, datastoreContext);
SchemaContext schemaContext = SchemaContextHelper.full();
--- /dev/null
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class DistributedDataStoreTest extends AbstractActorTest {
+
+ private SchemaContext schemaContext;
+
+ @Mock
+ private ActorContext actorContext;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+
+ schemaContext = TestModel.createTestContext();
+
+ doReturn(schemaContext).when(actorContext).getSchemaContext();
+ }
+
+ @Test
+ public void testRateLimitingUsedInReadWriteTxCreation(){
+ DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+
+ distributedDataStore.newReadWriteTransaction();
+
+ verify(actorContext, times(1)).acquireTxCreationPermit();
+ }
+
+ @Test
+ public void testRateLimitingUsedInWriteOnlyTxCreation(){
+ DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+
+ distributedDataStore.newWriteOnlyTransaction();
+
+ verify(actorContext, times(1)).acquireTxCreationPermit();
+ }
+
+
+ @Test
+ public void testRateLimitingNotUsedInReadOnlyTxCreation(){
+ DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+
+ distributedDataStore.newReadOnlyTransaction();
+ distributedDataStore.newReadOnlyTransaction();
+ distributedDataStore.newReadOnlyTransaction();
+
+ verify(actorContext, times(0)).acquireTxCreationPermit();
+ }
+
+}
\ No newline at end of file
package org.opendaylight.controller.cluster.datastore;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.Creator;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
-import java.net.URI;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
public class ShardManagerTest extends AbstractActorTest {
private static int ID_COUNTER = 1;
}
private Props newShardMgrProps() {
- return ShardManager.props(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
- DatastoreContext.newBuilder().build());
+
+ DatastoreContext.Builder builder = DatastoreContext.newBuilder();
+ builder.dataStoreType(shardMrgIDSuffix);
+ return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), builder.build());
}
@Test
public void testRecoveryApplicable(){
new JavaTestKit(getSystem()) {
{
- final Props persistentProps = ShardManager.props(shardMrgIDSuffix,
- new MockClusterWrapper(),
- new MockConfiguration(),
- DatastoreContext.newBuilder().persistent(true).build());
+ final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
+ DatastoreContext.newBuilder().persistent(true).dataStoreType(shardMrgIDSuffix).build());
final TestActorRef<ShardManager> persistentShardManager =
TestActorRef.create(getSystem(), persistentProps);
assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
- final Props nonPersistentProps = ShardManager.props(shardMrgIDSuffix,
- new MockClusterWrapper(),
- new MockConfiguration(),
- DatastoreContext.newBuilder().persistent(false).build());
+ final Props nonPersistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
+ DatastoreContext.newBuilder().persistent(false).dataStoreType(shardMrgIDSuffix).build());
final TestActorRef<ShardManager> nonPersistentShardManager =
TestActorRef.create(getSystem(), nonPersistentProps);
private static final long serialVersionUID = 1L;
@Override
public ShardManager create() throws Exception {
- return new ShardManager(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build()) {
+ return new ShardManager(new MockClusterWrapper(), new MockConfiguration(),
+ DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build()) {
@Override
protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
DataPersistenceProviderMonitor dataPersistenceProviderMonitor
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
TestShardManager(String shardMrgIDSuffix) {
- super(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
- DatastoreContext.newBuilder().build());
+ super(new MockClusterWrapper(), new MockConfiguration(),
+ DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build());
}
@Override
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import akka.actor.ActorPath;
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.dispatch.Futures;
+import akka.util.Timeout;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
@Mock
private ActorContext actorContext;
+ @Mock
+ private DatastoreContext datastoreContext;
+
+ @Mock
+ private Timer commitTimer;
+
+ @Mock
+ private Timer.Context commitTimerContext;
+
+ @Mock
+ private Snapshot commitSnapshot;
+
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
doReturn(getSystem()).when(actorContext).getActorSystem();
+ doReturn(datastoreContext).when(actorContext).getDatastoreContext();
+ doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
+ doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
+ doReturn(commitTimerContext).when(commitTimer).time();
+ doReturn(commitSnapshot).when(commitTimer).getSnapshot();
+ doReturn(TimeUnit.MILLISECONDS.toNanos(2000) * 1.0).when(commitSnapshot).get98thPercentile();
+ doReturn(10.0).when(actorContext).getTxCreationLimit();
}
private Future<ActorSelection> newCohort() {
}
stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
- isA(requestType));
+ isA(requestType), any(Timeout.class));
}
private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
verify(actorContext, times(nCohorts)).executeOperationAsync(
- any(ActorSelection.class), isA(requestType));
+ any(ActorSelection.class), isA(requestType), any(Timeout.class));
}
private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
try {
propagateExecutionExceptionCause(proxy.commit());
} finally {
+
+ verify(actorContext, never()).setTxCreationLimit(anyLong());
verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
}
+
}
@Test
setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
new CommitTransactionReply(), new CommitTransactionReply());
+ assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+
proxy.canCommit().get(5, TimeUnit.SECONDS);
proxy.preCommit().get(5, TimeUnit.SECONDS);
proxy.commit().get(5, TimeUnit.SECONDS);
verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
+
+ // Verify that the creation limit was changed to 0.5 (based on setup)
+ verify(actorContext, timeout(5000)).setTxCreationLimit(0.5);
+ }
+
+ @Test
+ public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxy(0);
+
+ assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+
+ proxy.canCommit().get(5, TimeUnit.SECONDS);
+ proxy.preCommit().get(5, TimeUnit.SECONDS);
+ proxy.commit().get(5, TimeUnit.SECONDS);
+
+ verify(actorContext, never()).setTxCreationLimit(anyLong());
}
}
package org.opendaylight.controller.cluster.datastore;
import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
ActorContext actorContext = null;
SchemaContext schemaContext = mock(SchemaContext.class);
+ @Mock
+ ActorContext mockActorContext;
+
@Before
public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
actorContext = new MockActorContext(getSystem());
actorContext.setSchemaContext(schemaContext);
+
+ doReturn(schemaContext).when(mockActorContext).getSchemaContext();
}
@SuppressWarnings("resource")
Assert.assertNotEquals(one.getTransactionChainId(), two.getTransactionChainId());
}
+
+ @Test
+ public void testRateLimitingUsedInReadWriteTxCreation(){
+ TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+ txChainProxy.newReadWriteTransaction();
+
+ verify(mockActorContext, times(1)).acquireTxCreationPermit();
+ }
+
+ @Test
+ public void testRateLimitingUsedInWriteOnlyTxCreation(){
+ TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+ txChainProxy.newWriteOnlyTransaction();
+
+ verify(mockActorContext, times(1)).acquireTxCreationPermit();
+ }
+
+
+ @Test
+ public void testRateLimitingNotUsedInReadOnlyTxCreation(){
+ TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+ txChainProxy.newReadOnlyTransaction();
+
+ verify(mockActorContext, times(0)).acquireTxCreationPermit();
+ }
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.time.StopWatch;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
assertEquals(expected, actual);
}
+ @Test
+ public void testRateLimiting(){
+ DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+ doReturn("config").when(mockDataStoreContext).getDataStoreType();
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+ mock(Configuration.class), mockDataStoreContext);
+
+ // Check that the initial value is being picked up from DataStoreContext
+ assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
+
+ actorContext.setTxCreationLimit(1.0);
+
+ assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
+
+
+ StopWatch watch = new StopWatch();
+
+ watch.start();
+
+ actorContext.acquireTxCreationPermit();
+ actorContext.acquireTxCreationPermit();
+ actorContext.acquireTxCreationPermit();
+
+ watch.stop();
+
+ assertTrue("did not take as much time as expected", watch.getTime() > 1000);
+ }
}
<groupId>org.opendaylight.controller.model</groupId>
<artifactId>model-inventory</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools.model</groupId>
+ <artifactId>ietf-topology</artifactId>
+ </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-broker-impl</artifactId>
import org.opendaylight.controller.sal.connect.netconf.NetconfDevice;
import org.opendaylight.controller.sal.connect.netconf.NetconfStateSchemas;
import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceSalFacade;
import org.opendaylight.controller.sal.connect.netconf.schema.mapping.NetconfMessageTransformer;
import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
private static final Logger logger = LoggerFactory.getLogger(NetconfConnectorModule.class);
private BundleContext bundleContext;
- private Optional<NetconfSessionCapabilities> userCapabilities;
+ private Optional<NetconfSessionPreferences> userCapabilities;
private SchemaSourceRegistry schemaRegistry;
private SchemaContextFactory schemaContextFactory;
@Override
public java.lang.AutoCloseable createInstance() {
- final RemoteDeviceId id = new RemoteDeviceId(getIdentifier());
+ final RemoteDeviceId id = new RemoteDeviceId(getIdentifier(), getSocketAddress());
final ExecutorService globalProcessingExecutor = getProcessingExecutorDependency().getExecutor();
final Broker domBroker = getDomRegistryDependency();
final BindingAwareBroker bindingBroker = getBindingRegistryDependency();
- final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade
+ final RemoteDeviceHandler<NetconfSessionPreferences> salFacade
= new NetconfDeviceSalFacade(id, domBroker, bindingBroker, bundleContext, globalProcessingExecutor);
final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO =
return new MyAutoCloseable(listener, salFacade);
}
- private Optional<NetconfSessionCapabilities> getUserCapabilities() {
+ private Optional<NetconfSessionPreferences> getUserCapabilities() {
if(getYangModuleCapabilities() == null) {
return Optional.absent();
}
return Optional.absent();
}
- final NetconfSessionCapabilities parsedOverrideCapabilities = NetconfSessionCapabilities.fromStrings(capabilities);
+ final NetconfSessionPreferences parsedOverrideCapabilities = NetconfSessionPreferences.fromStrings(capabilities);
JmxAttributeValidationException.checkCondition(
parsedOverrideCapabilities.getNonModuleCaps().isEmpty(),
"Capabilities to override can only contain module based capabilities, non-module capabilities will be retrieved from the device," +
}
private static final class MyAutoCloseable implements AutoCloseable {
- private final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade;
+ private final RemoteDeviceHandler<NetconfSessionPreferences> salFacade;
private final NetconfDeviceCommunicator listener;
public MyAutoCloseable(final NetconfDeviceCommunicator listener,
- final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade) {
+ final RemoteDeviceHandler<NetconfSessionPreferences> salFacade) {
this.listener = listener;
this.salFacade = salFacade;
}
void onRemoteSessionDown();
+ void onRemoteSessionFailed(Throwable throwable);
+
void onNotification(M notification);
}
void onDeviceDisconnected();
+ void onDeviceFailed(Throwable throwable);
+
void onNotification(CompositeNode domNotification);
void close();
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collection;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.opendaylight.controller.sal.connect.api.RemoteDevice;
import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc;
import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaYangSourceProvider;
import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability.FailureReason;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
/**
* This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
*/
-public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilities, NetconfMessage> {
+public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage> {
private static final Logger logger = LoggerFactory.getLogger(NetconfDevice.class);
private final RemoteDeviceId id;
private final SchemaContextFactory schemaContextFactory;
- private final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade;
+ private final RemoteDeviceHandler<NetconfSessionPreferences> salFacade;
private final ListeningExecutorService processingExecutor;
private final SchemaSourceRegistry schemaRegistry;
private final MessageTransformer<NetconfMessage> messageTransformer;
private final NotificationHandler notificationHandler;
private final List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations = Lists.newArrayList();
- public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade,
+ public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
final ExecutorService globalProcessingExecutor, final MessageTransformer<NetconfMessage> messageTransformer) {
this.id = id;
this.schemaRegistry = schemaResourcesDTO.getSchemaRegistry();
}
@Override
- public void onRemoteSessionUp(final NetconfSessionCapabilities remoteSessionCapabilities,
+ public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities,
final RemoteDeviceCommunicator<NetconfMessage> listener) {
// SchemaContext setup has to be performed in a dedicated thread since
// we are in a netty thread in this method
};
Futures.addCallback(sourceResolverFuture, resolvedSourceCallback);
+
}
- private void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionCapabilities remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc) {
+ private void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc) {
updateMessageTransformer(result);
salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc);
notificationHandler.onRemoteSchemaUp();
resetMessageTransformer();
}
+ @Override
+ public void onRemoteSessionFailed(Throwable throwable) {
+ salFacade.onDeviceFailed(throwable);
+ }
+
@Override
public void onNotification(final NetconfMessage notification) {
notificationHandler.handleNotification(notification);
*/
private static class DeviceSourcesResolver implements Callable<DeviceSources> {
private final NetconfDeviceRpc deviceRpc;
- private final NetconfSessionCapabilities remoteSessionCapabilities;
+ private final NetconfSessionPreferences remoteSessionCapabilities;
private final RemoteDeviceId id;
private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
- public DeviceSourcesResolver(final NetconfDeviceRpc deviceRpc, final NetconfSessionCapabilities remoteSessionCapabilities, final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
+ public DeviceSourcesResolver(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
this.deviceRpc = deviceRpc;
this.remoteSessionCapabilities = remoteSessionCapabilities;
this.id = id;
*/
private final class RecursiveSchemaSetup implements Runnable {
private final DeviceSources deviceSources;
- private final NetconfSessionCapabilities remoteSessionCapabilities;
+ private final NetconfSessionPreferences remoteSessionCapabilities;
private final NetconfDeviceRpc deviceRpc;
private final RemoteDeviceCommunicator<NetconfMessage> listener;
+ private NetconfDeviceCapabilities capabilities;
- public RecursiveSchemaSetup(final DeviceSources deviceSources, final NetconfSessionCapabilities remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc, final RemoteDeviceCommunicator<NetconfMessage> listener) {
+ public RecursiveSchemaSetup(final DeviceSources deviceSources, final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc, final RemoteDeviceCommunicator<NetconfMessage> listener) {
this.deviceSources = deviceSources;
this.remoteSessionCapabilities = remoteSessionCapabilities;
this.deviceRpc = deviceRpc;
this.listener = listener;
+ this.capabilities = remoteSessionCapabilities.getNetconfDeviceCapabilities();
}
@Override
/**
* Recursively build schema context, in case of success or final failure notify device
*/
+ // FIXME reimplement without recursion
private void setUpSchema(final Collection<SourceIdentifier> requiredSources) {
logger.trace("{}: Trying to build schema context from {}", id, requiredSources);
@Override
public void onSuccess(final SchemaContext result) {
logger.debug("{}: Schema context built successfully from {}", id, requiredSources);
+ Collection<QName> filteredQNames = Sets.difference(remoteSessionCapabilities.getModuleBasedCaps(), capabilities.getUnresolvedCapabilites().keySet());
+ capabilities.addCapabilities(filteredQNames);
+ capabilities.addNonModuleBasedCapabilities(remoteSessionCapabilities.getNonModuleCaps());
handleSalInitializationSuccess(result, remoteSessionCapabilities, deviceRpc);
}
if (t instanceof MissingSchemaSourceException) {
final SourceIdentifier missingSource = ((MissingSchemaSourceException) t).getSourceId();
logger.warn("{}: Unable to build schema context, missing source {}, will reattempt without it", id, missingSource);
+ capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(Sets.newHashSet(missingSource)), FailureReason.MissingSource);
setUpSchema(stripMissingSource(requiredSources, missingSource));
// In case resolution error, try only with resolved sources
} else if (t instanceof SchemaResolutionException) {
// TODO check for infinite loop
final SchemaResolutionException resolutionException = (SchemaResolutionException) t;
+ final Set<SourceIdentifier> unresolvedSources = resolutionException.getUnsatisfiedImports().keySet();
+ capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(unresolvedSources), FailureReason.UnableToResolve);
logger.warn("{}: Unable to build schema context, unsatisfied imports {}, will reattempt with resolved only", id, resolutionException.getUnsatisfiedImports());
setUpSchema(resolutionException.getResolvedSources());
// unknown error, fail
Preconditions.checkState(removed, "{}: Trying to remove {} from {} failed", id, sIdToRemove, requiredSources);
return sourceIdentifiers;
}
+
+ private Collection<QName> getQNameFromSourceIdentifiers(Collection<SourceIdentifier> identifiers) {
+ Collection<QName> qNames = new HashSet<>();
+ for (SourceIdentifier source : identifiers) {
+ Optional<QName> qname = getQNameFromSourceIdentifier(source);
+ if (qname.isPresent()) {
+ qNames.add(qname.get());
+ }
+ }
+ if (qNames.isEmpty()) {
+ logger.debug("Unable to map any source identfiers to a capability reported by device : " + identifiers);
+ }
+ return qNames;
+ }
+
+ private Optional<QName> getQNameFromSourceIdentifier(SourceIdentifier identifier) {
+ for (QName qname : remoteSessionCapabilities.getModuleBasedCaps()) {
+ if (qname.getLocalName().equals(identifier.getName())
+ && qname.getFormattedRevision().equals(identifier.getRevision())) {
+ return Optional.of(qname);
+ }
+ }
+ throw new IllegalArgumentException("Unable to map identifier to a devices reported capability: " + identifier);
+ }
}
}
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ExecutionException;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc;
import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
* Factory for NetconfStateSchemas
*/
public interface NetconfStateSchemasResolver {
- NetconfStateSchemas resolve(final NetconfDeviceRpc deviceRpc, final NetconfSessionCapabilities remoteSessionCapabilities, final RemoteDeviceId id);
+ NetconfStateSchemas resolve(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id);
}
/**
public static final class NetconfStateSchemasResolverImpl implements NetconfStateSchemasResolver {
@Override
- public NetconfStateSchemas resolve(final NetconfDeviceRpc deviceRpc, final NetconfSessionCapabilities remoteSessionCapabilities, final RemoteDeviceId id) {
+ public NetconfStateSchemas resolve(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id) {
return NetconfStateSchemas.create(deviceRpc, remoteSessionCapabilities, id);
}
}
/**
* Issue get request to remote device and parse response to find all schemas under netconf-state/schemas
*/
- private static NetconfStateSchemas create(final NetconfDeviceRpc deviceRpc, final NetconfSessionCapabilities remoteSessionCapabilities, final RemoteDeviceId id) {
+ private static NetconfStateSchemas create(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id) {
if(remoteSessionCapabilities.isMonitoringSupported() == false) {
logger.warn("{}: Netconf monitoring not supported on device, cannot detect provided schemas");
return EMPTY;
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connect.netconf.listener;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability.FailureReason;
+import org.opendaylight.yangtools.yang.common.QName;
+
+public final class NetconfDeviceCapabilities {
+ private final Map<QName, FailureReason> unresolvedCapabilites;
+ private final Set<QName> resolvedCapabilities;
+
+ private final Set<String> nonModuleBasedCapabilities;
+
+ public NetconfDeviceCapabilities() {
+ this.unresolvedCapabilites = new HashMap<>();
+ this.resolvedCapabilities = new HashSet<>();
+ this.nonModuleBasedCapabilities = new HashSet<>();
+ }
+
+ public void addUnresolvedCapability(QName source, FailureReason reason) {
+ unresolvedCapabilites.put(source, reason);
+ }
+
+ public void addUnresolvedCapabilities(Collection<QName> capabilities, FailureReason reason) {
+ for (QName s : capabilities) {
+ unresolvedCapabilites.put(s, reason);
+ }
+ }
+
+ public void addCapabilities(Collection<QName> availableSchemas) {
+ resolvedCapabilities.addAll(availableSchemas);
+ }
+
+ public void addNonModuleBasedCapabilities(Collection<String> nonModuleCapabilities) {
+ this.nonModuleBasedCapabilities.addAll(nonModuleCapabilities);
+ }
+
+ public Set<String> getNonModuleBasedCapabilities() {
+ return nonModuleBasedCapabilities;
+ }
+
+ public Map<QName, FailureReason> getUnresolvedCapabilites() {
+ return unresolvedCapabilites;
+ }
+
+ public Set<QName> getResolvedCapabilities() {
+ return resolvedCapabilities;
+ }
+
+}
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.GenericFutureListener;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.List;
private static final Logger logger = LoggerFactory.getLogger(NetconfDeviceCommunicator.class);
- private final RemoteDevice<NetconfSessionCapabilities, NetconfMessage> remoteDevice;
- private final Optional<NetconfSessionCapabilities> overrideNetconfCapabilities;
+ private final RemoteDevice<NetconfSessionPreferences, NetconfMessage> remoteDevice;
+ private final Optional<NetconfSessionPreferences> overrideNetconfCapabilities;
private final RemoteDeviceId id;
private final Lock sessionLock = new ReentrantLock();
private NetconfClientSession session;
private Future<?> initFuture;
- public NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice<NetconfSessionCapabilities, NetconfMessage> remoteDevice,
- final NetconfSessionCapabilities netconfSessionCapabilities) {
- this(id, remoteDevice, Optional.of(netconfSessionCapabilities));
+ public NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice<NetconfSessionPreferences, NetconfMessage> remoteDevice,
+ final NetconfSessionPreferences netconfSessionPreferences) {
+ this(id, remoteDevice, Optional.of(netconfSessionPreferences));
}
public NetconfDeviceCommunicator(final RemoteDeviceId id,
- final RemoteDevice<NetconfSessionCapabilities, NetconfMessage> remoteDevice) {
- this(id, remoteDevice, Optional.<NetconfSessionCapabilities>absent());
+ final RemoteDevice<NetconfSessionPreferences, NetconfMessage> remoteDevice) {
+ this(id, remoteDevice, Optional.<NetconfSessionPreferences>absent());
}
- private NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice<NetconfSessionCapabilities, NetconfMessage> remoteDevice,
- final Optional<NetconfSessionCapabilities> overrideNetconfCapabilities) {
+ private NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice<NetconfSessionPreferences, NetconfMessage> remoteDevice,
+ final Optional<NetconfSessionPreferences> overrideNetconfCapabilities) {
this.id = id;
this.remoteDevice = remoteDevice;
this.overrideNetconfCapabilities = overrideNetconfCapabilities;
logger.debug("{}: Session established", id);
this.session = session;
- NetconfSessionCapabilities netconfSessionCapabilities =
- NetconfSessionCapabilities.fromNetconfSession(session);
- logger.trace("{}: Session advertised capabilities: {}", id, netconfSessionCapabilities);
+ NetconfSessionPreferences netconfSessionPreferences =
+ NetconfSessionPreferences.fromNetconfSession(session);
+ logger.trace("{}: Session advertised capabilities: {}", id, netconfSessionPreferences);
if(overrideNetconfCapabilities.isPresent()) {
- netconfSessionCapabilities = netconfSessionCapabilities.replaceModuleCaps(overrideNetconfCapabilities.get());
- logger.debug("{}: Session capabilities overridden, capabilities that will be used: {}", id, netconfSessionCapabilities);
+ netconfSessionPreferences = netconfSessionPreferences.replaceModuleCaps(overrideNetconfCapabilities.get());
+ logger.debug("{}: Session capabilities overridden, capabilities that will be used: {}", id, netconfSessionPreferences);
}
- remoteDevice.onRemoteSessionUp(netconfSessionCapabilities, this);
+ remoteDevice.onRemoteSessionUp(netconfSessionPreferences, this);
}
finally {
sessionLock.unlock();
} else {
initFuture = dispatch.createClient(config);
}
+
+ initFuture.addListener(new GenericFutureListener<Future<Object>>(){
+
+ @Override
+ public void operationComplete(Future<Object> future) throws Exception {
+ if (!future.isSuccess()) {
+ logger.debug("{}: Connection failed", id, future.cause());
+ NetconfDeviceCommunicator.this.remoteDevice.onRemoteSessionFailed(future.cause());
+ }
+ }
+ });
}
private void tearDown( String reason ) {
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class NetconfSessionCapabilities {
+public final class NetconfSessionPreferences {
private static final class ParameterMatcher {
private final Predicate<String> predicate;
}
}
- private static final Logger LOG = LoggerFactory.getLogger(NetconfSessionCapabilities.class);
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfSessionPreferences.class);
private static final ParameterMatcher MODULE_PARAM = new ParameterMatcher("module=");
private static final ParameterMatcher REVISION_PARAM = new ParameterMatcher("revision=");
private static final ParameterMatcher BROKEN_REVISON_PARAM = new ParameterMatcher("amp;revision=");
private final Set<QName> moduleBasedCaps;
private final Set<String> nonModuleCaps;
- private NetconfSessionCapabilities(final Set<String> nonModuleCaps, final Set<QName> moduleBasedCaps) {
+ private NetconfSessionPreferences(final Set<String> nonModuleCaps, final Set<QName> moduleBasedCaps) {
this.nonModuleCaps = Preconditions.checkNotNull(nonModuleCaps);
this.moduleBasedCaps = Preconditions.checkNotNull(moduleBasedCaps);
}
|| containsNonModuleCapability(NetconfMessageTransformUtil.IETF_NETCONF_MONITORING.getNamespace().toString());
}
- public NetconfSessionCapabilities replaceModuleCaps(final NetconfSessionCapabilities netconfSessionModuleCapabilities) {
+ public NetconfSessionPreferences replaceModuleCaps(final NetconfSessionPreferences netconfSessionModuleCapabilities) {
final Set<QName> moduleBasedCaps = Sets.newHashSet(netconfSessionModuleCapabilities.getModuleBasedCaps());
// Preserve monitoring module, since it indicates support for ietf-netconf-monitoring
if(containsModuleCapability(NetconfMessageTransformUtil.IETF_NETCONF_MONITORING)) {
moduleBasedCaps.add(NetconfMessageTransformUtil.IETF_NETCONF_MONITORING);
}
- return new NetconfSessionCapabilities(getNonModuleCaps(), moduleBasedCaps);
+ return new NetconfSessionPreferences(getNonModuleCaps(), moduleBasedCaps);
}
- public static NetconfSessionCapabilities fromNetconfSession(final NetconfClientSession session) {
+ public static NetconfSessionPreferences fromNetconfSession(final NetconfClientSession session) {
return fromStrings(session.getServerCapabilities());
}
return QName.cachedReference(QName.create(URI.create(namespace), null, moduleName).withoutRevision());
}
- public static NetconfSessionCapabilities fromStrings(final Collection<String> capabilities) {
+ public static NetconfSessionPreferences fromStrings(final Collection<String> capabilities) {
final Set<QName> moduleBasedCaps = new HashSet<>();
final Set<String> nonModuleCaps = Sets.newHashSet(capabilities);
addModuleQName(moduleBasedCaps, nonModuleCaps, capability, cachedQName(namespace, moduleName));
}
- return new NetconfSessionCapabilities(ImmutableSet.copyOf(nonModuleCaps), ImmutableSet.copyOf(moduleBasedCaps));
+ return new NetconfSessionPreferences(ImmutableSet.copyOf(nonModuleCaps), ImmutableSet.copyOf(moduleBasedCaps));
}
moduleBasedCaps.add(qName);
nonModuleCaps.remove(capability);
}
+
+ private NetconfDeviceCapabilities capabilities = new NetconfDeviceCapabilities();
+
+ public NetconfDeviceCapabilities getNetconfDeviceCapabilities() {
+ return capabilities;
+ }
+
+
}
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.controller.sal.connect.netconf.sal.tx.ReadOnlyTx;
import org.opendaylight.controller.sal.connect.netconf.sal.tx.ReadWriteTx;
import org.opendaylight.controller.sal.connect.netconf.sal.tx.WriteCandidateTx;
final class NetconfDeviceDataBroker implements DOMDataBroker {
private final RemoteDeviceId id;
private final NetconfBaseOps netconfOps;
- private final NetconfSessionCapabilities netconfSessionPreferences;
+ private final NetconfSessionPreferences netconfSessionPreferences;
private final DataNormalizer normalizer;
- public NetconfDeviceDataBroker(final RemoteDeviceId id, final RpcImplementation rpc, final SchemaContext schemaContext, final NetconfSessionCapabilities netconfSessionPreferences) {
+ public NetconfDeviceDataBroker(final RemoteDeviceId id, final RpcImplementation rpc, final SchemaContext schemaContext, final NetconfSessionPreferences netconfSessionPreferences) {
this.id = id;
this.netconfOps = new NetconfBaseOps(rpc);
this.netconfSessionPreferences = netconfSessionPreferences;
*
* All data changes are submitted to an ExecutorService to avoid Thread blocking while sal is waiting for schema.
*/
+@Deprecated
final class NetconfDeviceDatastoreAdapter implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(NetconfDeviceDatastoreAdapter.class);
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.controller.sal.core.api.RpcImplementation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class NetconfDeviceSalFacade implements AutoCloseable, RemoteDeviceHandler<NetconfSessionCapabilities> {
+public final class NetconfDeviceSalFacade implements AutoCloseable, RemoteDeviceHandler<NetconfSessionPreferences> {
private static final Logger logger= LoggerFactory.getLogger(NetconfDeviceSalFacade.class);
@Override
public synchronized void onDeviceConnected(final SchemaContext schemaContext,
- final NetconfSessionCapabilities netconfSessionPreferences, final RpcImplementation deviceRpc) {
+ final NetconfSessionPreferences netconfSessionPreferences, final RpcImplementation deviceRpc) {
// TODO move SchemaAwareRpcBroker from sal-broker-impl, now we have depend on the whole sal-broker-impl
final RpcProvisionRegistry rpcRegistry = new SchemaAwareRpcBroker(id.getPath().toString(), new SchemaContextProvider() {
salProvider.getMountInstance().onDeviceConnected(schemaContext, domBroker, rpcRegistry, notificationService);
salProvider.getDatastoreAdapter().updateDeviceState(true, netconfSessionPreferences.getModuleBasedCaps());
+ salProvider.getMountInstance().onTopologyDeviceConnected(schemaContext, domBroker, rpcRegistry, notificationService);
+ salProvider.getTopologyDatastoreAdapter().updateDeviceData(true, netconfSessionPreferences.getNetconfDeviceCapabilities());
}
@Override
public synchronized void onDeviceDisconnected() {
salProvider.getDatastoreAdapter().updateDeviceState(false, Collections.<QName>emptySet());
+ salProvider.getTopologyDatastoreAdapter().updateDeviceData(false, new NetconfDeviceCapabilities());
salProvider.getMountInstance().onDeviceDisconnected();
+ salProvider.getMountInstance().onTopologyDeviceDisconnected();
+ }
+
+ @Override
+ public void onDeviceFailed(Throwable throwable) {
+ salProvider.getTopologyDatastoreAdapter().setDeviceAsFailed(throwable);
+ salProvider.getMountInstance().onDeviceDisconnected();
+ salProvider.getMountInstance().onTopologyDeviceDisconnected();
}
private void registerRpcsToSal(final SchemaContext schemaContext, final RpcProvisionRegistry rpcRegistry, final RpcImplementation deviceRpc) {
private volatile NetconfDeviceDatastoreAdapter datastoreAdapter;
private MountInstance mountInstance;
+ private volatile NetconfDeviceTopologyAdapter topologyDatastoreAdapter;
+
public NetconfDeviceSalProvider(final RemoteDeviceId deviceId, final ExecutorService executor) {
this.id = deviceId;
this.executor = executor;
return datastoreAdapter;
}
+ public NetconfDeviceTopologyAdapter getTopologyDatastoreAdapter() {
+ Preconditions.checkState(topologyDatastoreAdapter != null,
+ "%s: Sal provider %s was not initialized by sal. Cannot get topology datastore adapter", id);
+ return topologyDatastoreAdapter;
+ }
+
@Override
public void onSessionInitiated(final Broker.ProviderSession session) {
logger.debug("{}: (BI)Session with sal established {}", id, session);
final DataBroker dataBroker = session.getSALService(DataBroker.class);
datastoreAdapter = new NetconfDeviceDatastoreAdapter(id, dataBroker);
+
+ topologyDatastoreAdapter = new NetconfDeviceTopologyAdapter(id, dataBroker);
}
public void close() throws Exception {
private ObjectRegistration<DOMMountPoint> registration;
private NotificationPublishService notificationSerivce;
+ private ObjectRegistration<DOMMountPoint> topologyRegistration;
+
MountInstance(final DOMMountPointService mountService, final RemoteDeviceId id) {
this.mountService = Preconditions.checkNotNull(mountService);
this.id = Preconditions.checkNotNull(id);
}
+ @Deprecated
synchronized void onDeviceConnected(final SchemaContext initialCtx,
final DOMDataBroker broker, final RpcProvisionRegistry rpc,
final NotificationPublishService notificationSerivce) {
registration = mountBuilder.register();
}
+ @Deprecated
synchronized void onDeviceDisconnected() {
if(registration == null) {
return;
}
}
+ synchronized void onTopologyDeviceConnected(final SchemaContext initialCtx,
+ final DOMDataBroker broker, final RpcProvisionRegistry rpc,
+ final NotificationPublishService notificationSerivce) {
+
+ Preconditions.checkNotNull(mountService, "Closed");
+ Preconditions.checkState(topologyRegistration == null, "Already initialized");
+
+ final DOMMountPointService.DOMMountPointBuilder mountBuilder = mountService.createMountPoint(id.getTopologyPath());
+ mountBuilder.addInitialSchemaContext(initialCtx);
+
+ mountBuilder.addService(DOMDataBroker.class, broker);
+ mountBuilder.addService(RpcProvisionRegistry.class, rpc);
+ this.notificationSerivce = notificationSerivce;
+ mountBuilder.addService(NotificationPublishService.class, notificationSerivce);
+
+ topologyRegistration = mountBuilder.register();
+ }
+
+ synchronized void onTopologyDeviceDisconnected() {
+ if(topologyRegistration == null) {
+ return;
+ }
+
+ try {
+ topologyRegistration.close();
+ } catch (final Exception e) {
+ // Only log and ignore
+ logger.warn("Unable to unregister mount instance for {}. Ignoring exception", id.getTopologyPath(), e);
+ } finally {
+ topologyRegistration = null;
+ }
+ }
+
@Override
synchronized public void close() throws Exception {
if(registration != null) {
onDeviceDisconnected();
+ onTopologyDeviceDisconnected();
}
mountService = null;
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connect.netconf.sal;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCapabilities;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.PortNumber;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilitiesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.UnavailableCapabilities;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.UnavailableCapabilitiesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability.FailureReason;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapabilityBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class NetconfDeviceTopologyAdapter implements AutoCloseable {
+
+ public static final Logger logger = LoggerFactory.getLogger(NetconfDeviceTopologyAdapter.class);
+ public static final Function<Entry<QName, FailureReason>, UnavailableCapability> UNAVAILABLE_CAPABILITY_TRANSFORMER = new Function<Entry<QName, FailureReason>, UnavailableCapability>() {
+ @Override
+ public UnavailableCapability apply(final Entry<QName, FailureReason> input) {
+ return new UnavailableCapabilityBuilder()
+ .setCapability(input.getKey().toString())
+ .setFailureReason(input.getValue()).build();
+ }
+ };
+ public static final Function<QName, String> AVAILABLE_CAPABILITY_TRANSFORMER = new Function<QName, String>() {
+ @Override
+ public String apply(QName qName) {
+ return qName.toString();
+ }
+ };
+
+ private final RemoteDeviceId id;
+ private final DataBroker dataService;
+
+ private final InstanceIdentifier<NetworkTopology> networkTopologyPath;
+ private final KeyedInstanceIdentifier<Topology, TopologyKey> topologyListPath;
+ private static final String UNKNOWN_REASON = "Unknown reason";
+
+ NetconfDeviceTopologyAdapter(final RemoteDeviceId id, final DataBroker dataService) {
+ this.id = id;
+ this.dataService = dataService;
+
+ this.networkTopologyPath = InstanceIdentifier.builder(NetworkTopology.class).build();
+ this.topologyListPath = networkTopologyPath.child(Topology.class, new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())));
+
+ initDeviceData();
+ }
+
+ private void initDeviceData() {
+ final WriteTransaction writeTx = dataService.newWriteOnlyTransaction();
+
+ createNetworkTopologyIfNotPresent(writeTx);
+
+ final InstanceIdentifier<Node> path = id.getTopologyBindingPath();
+ NodeBuilder nodeBuilder = getNodeIdBuilder(id);
+ NetconfNodeBuilder netconfNodeBuilder = new NetconfNodeBuilder();
+ netconfNodeBuilder.setConnectionStatus(ConnectionStatus.Connecting);
+ netconfNodeBuilder.setHost(id.getHost());
+ netconfNodeBuilder.setPort(new PortNumber(id.getAddress().getPort()));
+ nodeBuilder.addAugmentation(NetconfNode.class, netconfNodeBuilder.build());
+ Node node = nodeBuilder.build();
+
+ logger.trace("{}: Init device state transaction {} putting if absent operational data started.", id, writeTx.getIdentifier());
+ writeTx.put(LogicalDatastoreType.OPERATIONAL, path, node);
+ logger.trace("{}: Init device state transaction {} putting operational data ended.", id, writeTx.getIdentifier());
+
+ logger.trace("{}: Init device state transaction {} putting if absent config data started.", id, writeTx.getIdentifier());
+ writeTx.put(LogicalDatastoreType.CONFIGURATION, path, getNodeWithId(id));
+ logger.trace("{}: Init device state transaction {} putting config data ended.", id, writeTx.getIdentifier());
+
+ commitTransaction(writeTx, "init");
+ }
+
+ public void updateDeviceData(boolean up, NetconfDeviceCapabilities capabilities) {
+ final Node data = buildDataForNetconfNode(up, capabilities);
+
+ final WriteTransaction writeTx = dataService.newWriteOnlyTransaction();
+ logger.trace("{}: Update device state transaction {} merging operational data started.", id, writeTx.getIdentifier());
+ writeTx.put(LogicalDatastoreType.OPERATIONAL, id.getTopologyBindingPath(), data);
+ logger.trace("{}: Update device state transaction {} merging operational data ended.", id, writeTx.getIdentifier());
+
+ commitTransaction(writeTx, "update");
+ }
+
+ public void setDeviceAsFailed(Throwable throwable) {
+ String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON;
+
+ final NetconfNode netconfNode = new NetconfNodeBuilder().setConnectionStatus(ConnectionStatus.UnableToConnect).setConnectedMessage(reason).build();
+ final Node data = getNodeIdBuilder(id).addAugmentation(NetconfNode.class, netconfNode).build();
+
+ final WriteTransaction writeTx = dataService.newWriteOnlyTransaction();
+ logger.trace("{}: Setting device state as failed {} putting operational data started.", id, writeTx.getIdentifier());
+ writeTx.put(LogicalDatastoreType.OPERATIONAL, id.getTopologyBindingPath(), data);
+ logger.trace("{}: Setting device state as failed {} putting operational data ended.", id, writeTx.getIdentifier());
+
+ commitTransaction(writeTx, "update-failed-device");
+ }
+
+ private Node buildDataForNetconfNode(boolean up, NetconfDeviceCapabilities capabilities) {
+ List<String> capabilityList = new ArrayList<>();
+ capabilityList.addAll(capabilities.getNonModuleBasedCapabilities());
+ capabilityList.addAll(FluentIterable.from(capabilities.getResolvedCapabilities()).transform(AVAILABLE_CAPABILITY_TRANSFORMER).toList());
+ final AvailableCapabilitiesBuilder avCapabalitiesBuilder = new AvailableCapabilitiesBuilder();
+ avCapabalitiesBuilder.setAvailableCapability(capabilityList);
+
+ final UnavailableCapabilities unavailableCapabilities =
+ new UnavailableCapabilitiesBuilder().setUnavailableCapability(FluentIterable.from(capabilities.getUnresolvedCapabilites().entrySet())
+ .transform(UNAVAILABLE_CAPABILITY_TRANSFORMER).toList()).build();
+
+ final NetconfNodeBuilder netconfNodeBuilder = new NetconfNodeBuilder()
+ .setHost(id.getHost())
+ .setPort(new PortNumber(id.getAddress().getPort()))
+ .setConnectionStatus(up ? ConnectionStatus.Connected : ConnectionStatus.Connecting)
+ .setAvailableCapabilities(avCapabalitiesBuilder.build())
+ .setUnavailableCapabilities(unavailableCapabilities);
+
+ final NodeBuilder nodeBuilder = getNodeIdBuilder(id);
+ final Node node = nodeBuilder.addAugmentation(NetconfNode.class, netconfNodeBuilder.build()).build();
+
+ return node;
+ }
+
+ public void removeDeviceConfiguration() {
+ final WriteTransaction writeTx = dataService.newWriteOnlyTransaction();
+
+ logger.trace("{}: Close device state transaction {} removing all data started.", id, writeTx.getIdentifier());
+ writeTx.delete(LogicalDatastoreType.CONFIGURATION, id.getTopologyBindingPath());
+ writeTx.delete(LogicalDatastoreType.OPERATIONAL, id.getTopologyBindingPath());
+ logger.trace("{}: Close device state transaction {} removing all data ended.", id, writeTx.getIdentifier());
+
+ commitTransaction(writeTx, "close");
+ }
+
+ private void createNetworkTopologyIfNotPresent(final WriteTransaction writeTx) {
+
+ final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
+ logger.trace("{}: Merging {} container to ensure its presence", id, networkTopology.QNAME, writeTx.getIdentifier());
+ writeTx.merge(LogicalDatastoreType.CONFIGURATION, networkTopologyPath, networkTopology);
+ writeTx.merge(LogicalDatastoreType.OPERATIONAL, networkTopologyPath, networkTopology);
+
+ final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(TopologyNetconf.QNAME.getLocalName())).build();
+ logger.trace("{}: Merging {} container to ensure its presence", id, topology.QNAME, writeTx.getIdentifier());
+ writeTx.merge(LogicalDatastoreType.CONFIGURATION, topologyListPath, topology);
+ writeTx.merge(LogicalDatastoreType.OPERATIONAL, topologyListPath, topology);
+ }
+
+ private void commitTransaction(final WriteTransaction transaction, final String txType) {
+ logger.trace("{}: Committing Transaction {}:{}", id, txType, transaction.getIdentifier());
+ final CheckedFuture<Void, TransactionCommitFailedException> result = transaction.submit();
+
+ Futures.addCallback(result, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ logger.trace("{}: Transaction({}) {} SUCCESSFUL", id, txType, transaction.getIdentifier());
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ logger.error("{}: Transaction({}) {} FAILED!", id, txType, transaction.getIdentifier(), t);
+ throw new IllegalStateException(id + " Transaction(" + txType + ") not committed correctly", t);
+ }
+ });
+
+ }
+
+ private static Node getNodeWithId(final RemoteDeviceId id) {
+ final NodeBuilder builder = getNodeIdBuilder(id);
+ return builder.build();
+ }
+
+ private static NodeBuilder getNodeIdBuilder(final RemoteDeviceId id) {
+ final NodeBuilder nodeBuilder = new NodeBuilder();
+ nodeBuilder.setKey(new NodeKey(new NodeId(id.getName())));
+ return nodeBuilder;
+ }
+
+ @Override
+ public void close() throws Exception {
+ removeDeviceConfiguration();
+ }
+}
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.controller.sal.connect.netconf.util.NetconfBaseOps;
import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
import org.opendaylight.yangtools.yang.common.RpcResult;
protected final RemoteDeviceId id;
protected final NetconfBaseOps netOps;
protected final DataNormalizer normalizer;
- protected final NetconfSessionCapabilities netconfSessionPreferences;
+ protected final NetconfSessionPreferences netconfSessionPreferences;
// Allow commit to be called only once
protected boolean finished = false;
- public AbstractWriteTx(final NetconfBaseOps netOps, final RemoteDeviceId id, final DataNormalizer normalizer, final NetconfSessionCapabilities netconfSessionPreferences) {
+ public AbstractWriteTx(final NetconfBaseOps netOps, final RemoteDeviceId id, final DataNormalizer normalizer, final NetconfSessionPreferences netconfSessionPreferences) {
this.netOps = netOps;
this.id = id;
this.normalizer = normalizer;
import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.controller.sal.connect.netconf.util.NetconfBaseOps;
import org.opendaylight.controller.sal.connect.netconf.util.NetconfRpcFutureCallback;
import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
private static final Logger LOG = LoggerFactory.getLogger(WriteCandidateRunningTx.class);
- public WriteCandidateRunningTx(final RemoteDeviceId id, final NetconfBaseOps netOps, final DataNormalizer normalizer, final NetconfSessionCapabilities netconfSessionPreferences) {
+ public WriteCandidateRunningTx(final RemoteDeviceId id, final NetconfBaseOps netOps, final DataNormalizer normalizer, final NetconfSessionPreferences netconfSessionPreferences) {
super(id, netOps, normalizer, netconfSessionPreferences);
}
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.controller.sal.connect.netconf.util.NetconfBaseOps;
import org.opendaylight.controller.sal.connect.netconf.util.NetconfRpcFutureCallback;
import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
}
};
- public WriteCandidateTx(final RemoteDeviceId id, final NetconfBaseOps rpc, final DataNormalizer normalizer, final NetconfSessionCapabilities netconfSessionPreferences) {
+ public WriteCandidateTx(final RemoteDeviceId id, final NetconfBaseOps rpc, final DataNormalizer normalizer, final NetconfSessionPreferences netconfSessionPreferences) {
super(rpc, id, normalizer, netconfSessionPreferences);
}
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.controller.sal.connect.netconf.util.NetconfBaseOps;
import org.opendaylight.controller.sal.connect.netconf.util.NetconfRpcFutureCallback;
import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
private static final Logger LOG = LoggerFactory.getLogger(WriteRunningTx.class);
public WriteRunningTx(final RemoteDeviceId id, final NetconfBaseOps netOps,
- final DataNormalizer normalizer, final NetconfSessionCapabilities netconfSessionPreferences) {
+ final DataNormalizer normalizer, final NetconfSessionPreferences netconfSessionPreferences) {
super(netOps, id, normalizer, netconfSessionPreferences);
}
*/
package org.opendaylight.controller.sal.connect.util;
+import com.google.common.base.Preconditions;
+import java.net.InetSocketAddress;
import org.opendaylight.controller.config.api.ModuleIdentifier;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.HostBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-
-import com.google.common.base.Preconditions;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-public class RemoteDeviceId {
+public final class RemoteDeviceId {
private final String name;
private final org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier path;
private final InstanceIdentifier<Node> bindingPath;
private final NodeKey key;
+ private final org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier topologyPath;
+ private final InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node> topologyBindingPath;
+ private InetSocketAddress address;
+ private Host host;
+ @Deprecated
public RemoteDeviceId(final ModuleIdentifier identifier) {
this(Preconditions.checkNotNull(identifier).getInstanceName());
}
+ public RemoteDeviceId(final ModuleIdentifier identifier, Host host) {
+ this(identifier);
+ this.host = host;
+ }
+
+ public RemoteDeviceId(final ModuleIdentifier identifier, InetSocketAddress address) {
+ this(identifier);
+ this.address = address;
+ this.host = buildHost();
+ }
+
+ @Deprecated
public RemoteDeviceId(final String name) {
Preconditions.checkNotNull(name);
this.name = name;
this.key = new NodeKey(new NodeId(name));
this.path = createBIPath(name);
this.bindingPath = createBindingPath(key);
+ this.topologyPath = createBIPathForTopology(name);
+ this.topologyBindingPath = createBindingPathForTopology(key);
+ }
+
+ public RemoteDeviceId(final String name, InetSocketAddress address) {
+ this(name);
+ this.address = address;
+ this.host = buildHost();
}
private static InstanceIdentifier<Node> createBindingPath(final NodeKey key) {
return builder.build();
}
+ private static InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node> createBindingPathForTopology(final NodeKey key) {
+ final InstanceIdentifier<NetworkTopology> networkTopology = InstanceIdentifier.builder(NetworkTopology.class).build();
+ final KeyedInstanceIdentifier<Topology, TopologyKey> topology = networkTopology.child(Topology.class, new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())));
+ return topology
+ .child(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.class,
+ new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey
+ (new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId(key.getId().getValue())));
+ }
+
+ private static org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier createBIPathForTopology(final String name) {
+ final org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.InstanceIdentifierBuilder builder =
+ org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.builder();
+ builder
+ .node(NetworkTopology.QNAME)
+ .nodeWithKey(Topology.QNAME, QName.create(Topology.QNAME, "topology-id"), TopologyNetconf.QNAME.getLocalName())
+ .nodeWithKey(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.QNAME,
+ QName.create(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.QNAME, "node-id"), name);
+ return builder.build();
+ }
+
+ private Host buildHost() {
+ return address.getAddress().getHostAddress() != null
+ ? HostBuilder.getDefaultInstance(address.getAddress().getHostAddress())
+ : HostBuilder.getDefaultInstance(address.getAddress().getHostName());
+ }
+
public String getName() {
return name;
}
return key;
}
+ public InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node> getTopologyBindingPath() {
+ return topologyBindingPath;
+ }
+
+ public YangInstanceIdentifier getTopologyPath() {
+ return topologyPath;
+ }
+
+ public InetSocketAddress getAddress() {
+ return address;
+ }
+
+ public Host getHost() {
+ return host;
+ }
+
@Override
public String toString() {
return "RemoteDevice{" + name +'}';
--- /dev/null
+module netconf-node-topology {
+ namespace "urn:opendaylight:netconf-node-topology";
+ prefix "nettop";
+
+ import network-topology { prefix nt; revision-date 2013-10-21; }
+ import yang-ext { prefix ext; revision-date "2013-07-09";}
+ import ietf-inet-types { prefix inet; revision-date "2010-09-24"; }
+
+ revision "2015-01-14" {
+ description "Initial revision of Topology model";
+ }
+
+ augment "/nt:network-topology/nt:topology/nt:topology-types" {
+ container topology-netconf {
+ }
+ }
+
+ grouping netconf-node-fields {
+ leaf connection-status {
+ type enumeration {
+ enum connecting;
+ enum connected;
+ enum unable-to-connect;
+ }
+ }
+
+ leaf host {
+ type inet:host;
+ }
+
+ leaf port {
+ type inet:port-number;
+ }
+
+ leaf connected-message {
+ type string;
+ }
+
+ container available-capabilities {
+ leaf-list available-capability {
+ type string;
+ }
+ }
+
+ container unavailable-capabilities {
+ list unavailable-capability {
+ leaf capability {
+ type string;
+ }
+
+ leaf failure-reason {
+ type enumeration {
+ enum missing-source;
+ enum unable-to-resolve;
+ }
+ }
+ }
+ }
+
+ container pass-through {
+ when "../connection-status = connected";
+ description
+ "When the underlying node is connected, its NETCONF context
+ is available verbatim under this container through the
+ mount extension.";
+ }
+ }
+
+ augment "/nt:network-topology/nt:topology/nt:node" {
+ when "../../nt:topology-types/topology-netconf";
+ ext:augment-identifier "netconf-node";
+
+ uses netconf-node-fields;
+ }
+}
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+
import com.google.common.base.Optional;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
import org.opendaylight.controller.sal.connect.api.SchemaSourceProviderFactory;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc;
import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
private static final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver = new NetconfStateSchemas.NetconfStateSchemasResolver() {
@Override
- public NetconfStateSchemas resolve(final NetconfDeviceRpc deviceRpc, final NetconfSessionCapabilities remoteSessionCapabilities, final RemoteDeviceId id) {
+ public NetconfStateSchemas resolve(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id) {
return NetconfStateSchemas.EMPTY;
}
};
public void testNetconfDeviceFailFirstSchemaFailSecondEmpty() throws Exception {
final ArrayList<String> capList = Lists.newArrayList(TEST_CAPABILITY);
- final RemoteDeviceHandler<NetconfSessionCapabilities> facade = getFacade();
+ final RemoteDeviceHandler<NetconfSessionPreferences> facade = getFacade();
final RemoteDeviceCommunicator<NetconfMessage> listener = getListener();
final SchemaContextFactory schemaFactory = getSchemaFactory();
= new NetconfDevice.SchemaResourcesDTO(getSchemaRegistry(), schemaFactory, stateSchemasResolver);
final NetconfDevice device = new NetconfDevice(schemaResourcesDTO, getId(), facade, getExecutor(), getMessageTransformer());
// Monitoring not supported
- final NetconfSessionCapabilities sessionCaps = getSessionCaps(false, capList);
+ final NetconfSessionPreferences sessionCaps = getSessionCaps(false, capList);
device.onRemoteSessionUp(sessionCaps, listener);
Mockito.verify(facade, Mockito.timeout(5000)).onDeviceDisconnected();
@Test
public void testNetconfDeviceMissingSource() throws Exception {
- final RemoteDeviceHandler<NetconfSessionCapabilities> facade = getFacade();
+ final RemoteDeviceHandler<NetconfSessionPreferences> facade = getFacade();
final RemoteDeviceCommunicator<NetconfMessage> listener = getListener();
final SchemaContextFactory schemaFactory = getSchemaFactory();
= new NetconfDevice.SchemaResourcesDTO(getSchemaRegistry(), schemaFactory, stateSchemasResolver);
final NetconfDevice device = new NetconfDevice(schemaResourcesDTO, getId(), facade, getExecutor(), getMessageTransformer());
// Monitoring supported
- final NetconfSessionCapabilities sessionCaps = getSessionCaps(true, Lists.newArrayList(TEST_CAPABILITY, TEST_CAPABILITY2));
+ final NetconfSessionPreferences sessionCaps = getSessionCaps(true, Lists.newArrayList(TEST_CAPABILITY, TEST_CAPABILITY2));
device.onRemoteSessionUp(sessionCaps, listener);
- Mockito.verify(facade, Mockito.timeout(5000)).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionCapabilities.class), any(RpcImplementation.class));
+ Mockito.verify(facade, Mockito.timeout(5000)).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionPreferences.class), any(RpcImplementation.class));
Mockito.verify(schemaFactory, times(2)).createSchemaContext(anyCollectionOf(SourceIdentifier.class));
}
@Test
public void testNotificationBeforeSchema() throws Exception {
- final RemoteDeviceHandler<NetconfSessionCapabilities> facade = getFacade();
+ final RemoteDeviceHandler<NetconfSessionPreferences> facade = getFacade();
final RemoteDeviceCommunicator<NetconfMessage> listener = getListener();
final MessageTransformer<NetconfMessage> messageTransformer = getMessageTransformer();
verify(facade, times(0)).onNotification(any(CompositeNode.class));
- final NetconfSessionCapabilities sessionCaps = getSessionCaps(true,
+ final NetconfSessionPreferences sessionCaps = getSessionCaps(true,
Lists.newArrayList(TEST_CAPABILITY));
device.onRemoteSessionUp(sessionCaps, listener);
@Test
public void testNetconfDeviceReconnect() throws Exception {
- final RemoteDeviceHandler<NetconfSessionCapabilities> facade = getFacade();
+ final RemoteDeviceHandler<NetconfSessionPreferences> facade = getFacade();
final RemoteDeviceCommunicator<NetconfMessage> listener = getListener();
final SchemaContextFactory schemaContextProviderFactory = getSchemaFactory();
final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO
= new NetconfDevice.SchemaResourcesDTO(getSchemaRegistry(), schemaContextProviderFactory, stateSchemasResolver);
final NetconfDevice device = new NetconfDevice(schemaResourcesDTO, getId(), facade, getExecutor(), messageTransformer);
- final NetconfSessionCapabilities sessionCaps = getSessionCaps(true,
+ final NetconfSessionPreferences sessionCaps = getSessionCaps(true,
Lists.newArrayList(TEST_NAMESPACE + "?module=" + TEST_MODULE + "&revision=" + TEST_REVISION));
device.onRemoteSessionUp(sessionCaps, listener);
verify(schemaContextProviderFactory, timeout(5000)).createSchemaContext(any(Collection.class));
verify(messageTransformer, timeout(5000)).onGlobalContextUpdated(any(SchemaContext.class));
- verify(facade, timeout(5000)).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionCapabilities.class), any(RpcImplementation.class));
+ verify(facade, timeout(5000)).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionPreferences.class), any(RpcImplementation.class));
device.onRemoteSessionDown();
verify(facade, timeout(5000)).onDeviceDisconnected();
verify(schemaContextProviderFactory, timeout(5000).times(2)).createSchemaContext(any(Collection.class));
verify(messageTransformer, timeout(5000).times(3)).onGlobalContextUpdated(any(SchemaContext.class));
- verify(facade, timeout(5000).times(2)).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionCapabilities.class), any(RpcImplementation.class));
+ verify(facade, timeout(5000).times(2)).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionPreferences.class), any(RpcImplementation.class));
}
private SchemaContextFactory getSchemaFactory() {
return parser.resolveSchemaContext(models);
}
- private RemoteDeviceHandler<NetconfSessionCapabilities> getFacade() throws Exception {
- final RemoteDeviceHandler<NetconfSessionCapabilities> remoteDeviceHandler = mockCloseableClass(RemoteDeviceHandler.class);
- doNothing().when(remoteDeviceHandler).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionCapabilities.class), any(RpcImplementation.class));
+ private RemoteDeviceHandler<NetconfSessionPreferences> getFacade() throws Exception {
+ final RemoteDeviceHandler<NetconfSessionPreferences> remoteDeviceHandler = mockCloseableClass(RemoteDeviceHandler.class);
+ doNothing().when(remoteDeviceHandler).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionPreferences.class), any(RpcImplementation.class));
doNothing().when(remoteDeviceHandler).onDeviceDisconnected();
doNothing().when(remoteDeviceHandler).onNotification(any(CompositeNode.class));
return remoteDeviceHandler;
return messageTransformer;
}
- public NetconfSessionCapabilities getSessionCaps(final boolean addMonitor, final Collection<String> additionalCapabilities) {
+ public NetconfSessionPreferences getSessionCaps(final boolean addMonitor, final Collection<String> additionalCapabilities) {
final ArrayList<String> capabilities = Lists.newArrayList(
XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1);
capabilities.addAll(additionalCapabilities);
- return NetconfSessionCapabilities.fromStrings(
+ return NetconfSessionPreferences.fromStrings(
capabilities);
}
NetconfClientSession mockSession;
@Mock
- RemoteDevice<NetconfSessionCapabilities, NetconfMessage> mockDevice;
+ RemoteDevice<NetconfSessionPreferences, NetconfMessage> mockDevice;
NetconfDeviceCommunicator communicator;
void setupSession()
{
doReturn( Collections.<String>emptySet() ).when( mockSession ).getServerCapabilities();
- doNothing().when( mockDevice ).onRemoteSessionUp( any( NetconfSessionCapabilities.class ),
+ doNothing().when( mockDevice ).onRemoteSessionUp( any( NetconfSessionPreferences.class ),
any( RemoteDeviceCommunicator.class ) );
communicator.onSessionUp( mockSession );
}
testCapability );
doReturn( serverCapabilities ).when( mockSession ).getServerCapabilities();
- ArgumentCaptor<NetconfSessionCapabilities> netconfSessionCapabilities =
- ArgumentCaptor.forClass( NetconfSessionCapabilities.class );
+ ArgumentCaptor<NetconfSessionPreferences> netconfSessionCapabilities =
+ ArgumentCaptor.forClass( NetconfSessionPreferences.class );
doNothing().when( mockDevice ).onRemoteSessionUp( netconfSessionCapabilities.capture(), eq( communicator ) );
communicator.onSessionUp( mockSession );
verify( mockSession ).getServerCapabilities();
verify( mockDevice ).onRemoteSessionUp( netconfSessionCapabilities.capture(), eq( communicator ) );
- NetconfSessionCapabilities actualCapabilites = netconfSessionCapabilities.getValue();
+ NetconfSessionPreferences actualCapabilites = netconfSessionCapabilities.getValue();
assertEquals( "containsModuleCapability", true, actualCapabilites.containsNonModuleCapability(
NetconfMessageTransformUtil.NETCONF_ROLLBACK_ON_ERROR_URI.toString()) );
assertEquals( "containsModuleCapability", false, actualCapabilites.containsNonModuleCapability(testCapability) );
*/
@Test
public void testNetconfDeviceReconnectInCommunicator() throws Exception {
- final RemoteDevice<NetconfSessionCapabilities, NetconfMessage> device = mock(RemoteDevice.class);
+ final RemoteDevice<NetconfSessionPreferences, NetconfMessage> device = mock(RemoteDevice.class);
final TimedReconnectStrategy timedReconnectStrategy = new TimedReconnectStrategy(GlobalEventExecutor.INSTANCE, 10000, 0, 1.0, null, 100L, null);
final ReconnectStrategy reconnectStrategy = spy(new ReconnectStrategy() {
import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
import org.opendaylight.yangtools.yang.common.QName;
-public class NetconfSessionCapabilitiesTest {
+public class NetconfSessionPreferencesTest {
@Test
public void testMerge() throws Exception {
"urn:ietf:params:netconf:base:1.0",
"urn:ietf:params:netconf:capability:rollback-on-error:1.0"
);
- final NetconfSessionCapabilities sessionCaps1 = NetconfSessionCapabilities.fromStrings(caps1);
+ final NetconfSessionPreferences sessionCaps1 = NetconfSessionPreferences.fromStrings(caps1);
assertCaps(sessionCaps1, 2, 3);
final List<String> caps2 = Lists.newArrayList(
"namespace:4?module=module4&revision=2012-12-12",
"randomNonModuleCap"
);
- final NetconfSessionCapabilities sessionCaps2 = NetconfSessionCapabilities.fromStrings(caps2);
+ final NetconfSessionPreferences sessionCaps2 = NetconfSessionPreferences.fromStrings(caps2);
assertCaps(sessionCaps2, 1, 2);
- final NetconfSessionCapabilities merged = sessionCaps1.replaceModuleCaps(sessionCaps2);
+ final NetconfSessionPreferences merged = sessionCaps1.replaceModuleCaps(sessionCaps2);
assertCaps(merged, 2, 2 + 1 /*Preserved monitoring*/);
for (final QName qName : sessionCaps2.getModuleBasedCaps()) {
assertThat(merged.getModuleBasedCaps(), hasItem(qName));
"namespace:2?module=module2&RANDOMSTRING;revision=2013-12-12" // This one should be ignored(same as first), since revision is in wrong format
);
- final NetconfSessionCapabilities sessionCaps1 = NetconfSessionCapabilities.fromStrings(caps1);
+ final NetconfSessionPreferences sessionCaps1 = NetconfSessionPreferences.fromStrings(caps1);
assertCaps(sessionCaps1, 0, 3);
}
- private void assertCaps(final NetconfSessionCapabilities sessionCaps1, final int nonModuleCaps, final int moduleCaps) {
+ private void assertCaps(final NetconfSessionPreferences sessionCaps1, final int nonModuleCaps, final int moduleCaps) {
assertEquals(nonModuleCaps, sessionCaps1.getNonModuleCaps().size());
assertEquals(moduleCaps, sessionCaps1.getModuleBasedCaps().size());
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connect.netconf.sal;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.util.concurrent.Futures;
+import java.net.InetSocketAddress;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCapabilities;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+public class NetconfDeviceTopologyAdapterTest {
+
+ private RemoteDeviceId id = new RemoteDeviceId("test", new InetSocketAddress("localhost", 22));
+
+ @Mock
+ private DataBroker broker;
+ @Mock
+ private WriteTransaction writeTx;
+ @Mock
+ private Node data;
+
+ private String txIdent = "test transaction";
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ doReturn(writeTx).when(broker).newWriteOnlyTransaction();
+ doNothing().when(writeTx).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(Node.class));
+ doNothing().when(writeTx).merge(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(Node.class));
+
+ doReturn(txIdent).when(writeTx).getIdentifier();
+ }
+
+ @Test
+ public void testFailedDevice() throws Exception {
+ doReturn(Futures.immediateCheckedFuture(null)).when(writeTx).submit();
+
+ NetconfDeviceTopologyAdapter adapter = new NetconfDeviceTopologyAdapter(id, broker);
+ adapter.setDeviceAsFailed(null);
+
+ verify(broker, times(2)).newWriteOnlyTransaction();
+ verify(writeTx, times(3)).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(Node.class));
+ }
+
+ @Test
+ public void testDeviceUpdate() throws Exception {
+ doReturn(Futures.immediateCheckedFuture(null)).when(writeTx).submit();
+
+ NetconfDeviceTopologyAdapter adapter = new NetconfDeviceTopologyAdapter(id, broker);
+ adapter.updateDeviceData(true, new NetconfDeviceCapabilities());
+
+ verify(broker, times(2)).newWriteOnlyTransaction();
+ verify(writeTx, times(3)).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(Node.class));
+ }
+
+}
\ No newline at end of file
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.controller.sal.connect.netconf.util.NetconfBaseOps;
import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
@Test
public void testDiscardChanges() {
final WriteCandidateTx tx = new WriteCandidateTx(id, new NetconfBaseOps(rpc), normalizer,
- NetconfSessionCapabilities.fromStrings(Collections.<String>emptySet()));
+ NetconfSessionPreferences.fromStrings(Collections.<String>emptySet()));
final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
try {
submitFuture.checkedGet();
.when(rpc).invokeRpc(any(QName.class), any(CompositeNode.class));
final WriteRunningTx tx = new WriteRunningTx(id, new NetconfBaseOps(rpc), normalizer,
- NetconfSessionCapabilities.fromStrings(Collections.<String>emptySet()));
+ NetconfSessionPreferences.fromStrings(Collections.<String>emptySet()));
try {
tx.delete(LogicalDatastoreType.CONFIGURATION, yangIId);
} catch (final Exception e) {
import akka.actor.Terminated;
import akka.actor.UntypedActor;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import org.opendaylight.controller.cluster.common.actor.Monitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TerminationMonitor extends UntypedActor{
- protected final LoggingAdapter LOG =
- Logging.getLogger(getContext().system(), this);
+ private static final Logger LOG = LoggerFactory.getLogger(TerminationMonitor.class);
public TerminationMonitor(){
- LOG.info("Created TerminationMonitor");
+ LOG.debug("Created TerminationMonitor");
}
@Override public void onReceive(Object message) throws Exception {
package org.opendaylight.controller.remote.rpc.registry;
import akka.actor.ActorRef;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import akka.japi.Option;
import akka.japi.Pair;
import com.google.common.base.Preconditions;
*/
public class RpcRegistry extends BucketStore<RoutingTable> {
- final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
-
public RpcRegistry() {
getLocalBucket().setData(new RoutingTable());
}
import akka.actor.Address;
import akka.actor.Props;
import akka.cluster.ClusterActorRefProvider;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
import org.opendaylight.controller.utils.ConditionalProbe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A store that syncs its data across nodes in the cluster.
private static final Long NO_VERSION = -1L;
- final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+ protected final Logger log = LoggerFactory.getLogger(getClass());
/**
* Bucket owned by the node
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import akka.dispatch.Mapper;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import akka.pattern.Patterns;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
/**
* Gossiper that syncs bucket store across nodes in the cluster.
public class Gossiper extends AbstractUntypedActorWithMetering {
- final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+ private final Logger log = LoggerFactory.getLogger(getClass());
private Cluster cluster;
@Override
public void postStop(){
- if (cluster != null)
+ if (cluster != null) {
cluster.unsubscribe(getSelf());
- if (gossipTask != null)
+ }
+ if (gossipTask != null) {
gossipTask.cancel();
+ }
}
@Override
protected void handleReceive(Object message) throws Exception {
//Usually sent by self via gossip task defined above. But its not enforced.
//These ticks can be sent by another actor as well which is esp. useful while testing
- if (message instanceof GossipTick)
+ if (message instanceof GossipTick) {
receiveGossipTick();
-
- //Message from remote gossiper with its bucket versions
- else if (message instanceof GossipStatus)
+ } else if (message instanceof GossipStatus) {
+ // Message from remote gossiper with its bucket versions
receiveGossipStatus((GossipStatus) message);
-
- //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
- //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
- //message with its local versions
- else if (message instanceof GossipEnvelope)
+ } else if (message instanceof GossipEnvelope) {
+ // Message from remote gossiper with buckets. This is usually in response to GossipStatus
+ // message. The contained buckets are newer as determined by the remote gossiper by
+ // comparing the GossipStatus message with its local versions.
receiveGossip((GossipEnvelope) message);
-
- else if (message instanceof ClusterEvent.MemberUp) {
+ } else if (message instanceof ClusterEvent.MemberUp) {
receiveMemberUp(((ClusterEvent.MemberUp) message).member());
} else if (message instanceof ClusterEvent.MemberRemoved) {
} else if ( message instanceof ClusterEvent.UnreachableMember){
receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
- } else
+ } else {
unhandled(message);
+ }
}
/**
*/
void receiveMemberUp(Member member) {
- if (selfAddress.equals(member.address()))
+ if (selfAddress.equals(member.address())) {
return; //ignore up notification for self
+ }
- if (!clusterMembers.contains(member.address()))
+ if (!clusterMembers.contains(member.address())) {
clusterMembers.add(member.address());
+ }
if(log.isDebugEnabled()) {
log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
}
* 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
*/
void receiveGossipTick(){
- if (clusterMembers.size() == 0) return; //no members to send gossip status to
+ if (clusterMembers.size() == 0) {
+ return; //no members to send gossip status to
+ }
Address remoteMemberToGossipTo;
- if (clusterMembers.size() == 1)
+ if (clusterMembers.size() == 1) {
remoteMemberToGossipTo = clusterMembers.get(0);
- else {
+ } else {
Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
remoteMemberToGossipTo = clusterMembers.get(randomIndex);
}
*/
void receiveGossipStatus(GossipStatus status){
//Don't accept messages from non-members
- if (!clusterMembers.contains(status.from()))
+ if (!clusterMembers.contains(status.from())) {
return;
+ }
final ActorRef sender = getSender();
Future<Object> futureReply =
for (Address address : remoteVersions.keySet()){
- if (localVersions.get(address) == null || remoteVersions.get(address) == null)
+ if (localVersions.get(address) == null || remoteVersions.get(address) == null) {
continue; //this condition is taken care of by above diffs
- if (localVersions.get(address) < remoteVersions.get(address))
+ }
+ if (localVersions.get(address) < remoteVersions.get(address)) {
localIsOlder.add(address);
- else if (localVersions.get(address) > remoteVersions.get(address))
+ } else if (localVersions.get(address) > remoteVersions.get(address)) {
localIsNewer.add(address);
+ }
}
- if (!localIsOlder.isEmpty())
+ if (!localIsOlder.isEmpty()) {
sendGossipStatusTo(sender, localVersions );
+ }
- if (!localIsNewer.isEmpty())
+ if (!localIsNewer.isEmpty()) {
sendGossipTo(sender, localIsNewer);//send newer buckets to remote
+ }
}
return null;
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-parser-impl</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-netconf-connector</artifactId>
+ </dependency>
</dependencies>
<build>
}
case SSH: {
writeStatus(consoleIO, "Connecting to %s via SSH. Please wait.", cliArgs.getAddress());
- connectionManager.connectBlocking(cliArgs.getAddress(), getClientSshConfig(cliArgs));
+ connectionManager.connectBlocking(cliArgs.getAddress(), cliArgs.getServerAddress(), getClientSshConfig(cliArgs));
break;
}
case NONE: {/* Do not connect initially */
import org.opendaylight.controller.netconf.cli.io.ConsoleContext;
import org.opendaylight.controller.netconf.cli.io.ConsoleIO;
import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.controller.sal.core.api.RpcImplementation;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
* Implementation of RemoteDeviceHandler. Integrates cli with
* sal-netconf-connector.
*/
-public class NetconfDeviceConnectionHandler implements RemoteDeviceHandler<NetconfSessionCapabilities> {
+public class NetconfDeviceConnectionHandler implements RemoteDeviceHandler<NetconfSessionPreferences> {