From: Tony Tkacik Date: Thu, 12 Feb 2015 12:18:47 +0000 (+0000) Subject: Merge "BUG-2627: do not duplicate descriptions" X-Git-Tag: release/lithium~481^2~4 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=4b207b5356775c4b4d231ae979f9f2134f617dd1;hp=6f9b565103acbc6eae1c50e5ef609241ebc8486e Merge "BUG-2627: do not duplicate descriptions" --- diff --git a/features/netconf/pom.xml b/features/netconf/pom.xml index 028c16b02f..997b6a275f 100644 --- a/features/netconf/pom.xml +++ b/features/netconf/pom.xml @@ -9,7 +9,7 @@ features-netconf - pom + jar features.xml @@ -38,6 +38,18 @@ org.opendaylight.controller netconf-auth + + org.opendaylight.controller + netconf-notifications-api + + + org.opendaylight.controller + netconf-notifications-impl + + + org.opendaylight.controller + ietf-netconf + org.opendaylight.controller ietf-netconf-monitoring @@ -46,6 +58,10 @@ org.opendaylight.controller ietf-netconf-monitoring-extension + + org.opendaylight.controller + ietf-netconf-notifications + org.opendaylight.yangtools.model ietf-inet-types @@ -124,6 +140,20 @@ org.opendaylight.controller netconf-monitoring + + + org.opendaylight.yangtools + features-test + ${yangtools.version} + test + + + + org.opendaylight.controller + opendaylight-karaf-empty + ${commons.opendaylight.version} + zip + @@ -169,6 +199,21 @@ + + org.apache.maven.plugins + maven-surefire-plugin + ${surefire.version} + + + org.opendaylight.controller + opendaylight-karaf-empty + ${commons.opendaylight.version} + + + org.opendaylight.yangtools:features-test + + + diff --git a/features/netconf/src/main/resources/features.xml b/features/netconf/src/main/resources/features.xml index 2affa27d19..a65502124b 100644 --- a/features/netconf/src/main/resources/features.xml +++ b/features/netconf/src/main/resources/features.xml @@ -85,6 +85,8 @@ odl-netconf-notifications-api + odl-netconf-util + odl-yangtools-binding-generator mvn:org.opendaylight.controller/netconf-notifications-impl/${project.version} diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java index a05d02cd09..7f5233c827 100644 --- a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java +++ b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java @@ -238,7 +238,7 @@ public abstract class AbstractDispatcher, L extends * @param connectStrategyFactory Factory for creating reconnection strategy for every reconnect attempt * * @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g. - * success if it indicates no further attempts should be made and failure if it reports an error + * success is never reported, only failure when it runs out of reconnection attempts. */ protected Future createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory, final PipelineInitializer initializer) { diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java index aaec95a74b..865c666ad2 100644 --- a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java +++ b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java @@ -15,6 +15,7 @@ import io.netty.channel.socket.SocketChannel; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; import java.net.InetSocketAddress; import org.slf4j.Logger; @@ -55,6 +56,15 @@ final class ReconnectPromise, L extends SessionList channel.pipeline().addLast(new ClosedChannelHandler(ReconnectPromise.this)); } }); + + pending.addListener(new GenericFutureListener>() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + ReconnectPromise.this.setFailure(future.cause()); + } + } + }); } /** diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ClientActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ClientActor.java index 8022e72157..fe25c75ae2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ClientActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ClientActor.java @@ -11,14 +11,13 @@ package org.opendaylight.controller.cluster.example; import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.UntypedActor; -import akka.event.Logging; -import akka.event.LoggingAdapter; import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ClientActor extends UntypedActor { - protected final LoggingAdapter LOG = - Logging.getLogger(getContext().system(), this); + protected final Logger LOG = LoggerFactory.getLogger(getClass()); private final ActorRef target; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 9aff86ba2b..c5ae4c41b2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -125,7 +125,7 @@ public class ExampleActor extends RaftActor { try { bs = fromObject(state); } catch (Exception e) { - LOG.error(e, "Exception in creating snapshot"); + LOG.error("Exception in creating snapshot", e); } getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null); } @@ -135,7 +135,7 @@ public class ExampleActor extends RaftActor { try { state.putAll((HashMap) toObject(snapshot)); } catch (Exception e) { - LOG.error(e, "Exception in applying snapshot"); + LOG.error("Exception in applying snapshot", e); } if(LOG.isDebugEnabled()) { LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 766b80e73d..3dc6ae469a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -10,8 +10,6 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; -import akka.event.Logging; -import akka.event.LoggingAdapter; import akka.japi.Procedure; import akka.persistence.RecoveryCompleted; import akka.persistence.SaveSnapshotFailure; @@ -43,6 +41,8 @@ import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * RaftActor encapsulates a state machine that needs to be kept synchronized @@ -85,8 +85,7 @@ import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntries * */ public abstract class RaftActor extends AbstractUntypedPersistentActor { - protected final LoggingAdapter LOG = - Logging.getLogger(getContext().system(), this); + protected final Logger LOG = LoggerFactory.getLogger(getClass()); /** * The current state determines the current behavior of a RaftActor @@ -338,8 +337,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof SaveSnapshotFailure) { SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message; - LOG.error(saveSnapshotFailure.cause(), "{}: SaveSnapshotFailure received for snapshot Cause:", - persistenceId()); + LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:", + persistenceId(), saveSnapshotFailure.cause()); context.getReplicatedLog().snapshotRollback(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index 0e1f20b246..9d391a1588 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -12,9 +12,8 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; -import akka.event.LoggingAdapter; - import java.util.Map; +import org.slf4j.Logger; /** * The RaftActorContext contains that portion of the RaftActors state that @@ -106,7 +105,7 @@ public interface RaftActorContext { * * @return */ - LoggingAdapter getLogger(); + Logger getLogger(); /** * Get a mapping of peerId's to their addresses diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 5438fe7c48..b71b3be352 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -8,15 +8,14 @@ package org.opendaylight.controller.cluster.raft; +import static com.google.common.base.Preconditions.checkState; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.UntypedActorContext; -import akka.event.LoggingAdapter; import java.util.Map; - -import static com.google.common.base.Preconditions.checkState; +import org.slf4j.Logger; public class RaftActorContextImpl implements RaftActorContext { @@ -36,7 +35,7 @@ public class RaftActorContextImpl implements RaftActorContext { private final Map peerAddresses; - private final LoggingAdapter LOG; + private final Logger LOG; private final ConfigParams configParams; @@ -47,7 +46,7 @@ public class RaftActorContextImpl implements RaftActorContext { ElectionTerm termInformation, long commitIndex, long lastApplied, ReplicatedLog replicatedLog, Map peerAddresses, ConfigParams configParams, - LoggingAdapter logger) { + Logger logger) { this.actor = actor; this.context = context; this.id = id; @@ -115,7 +114,7 @@ public class RaftActorContextImpl implements RaftActorContext { return context.system(); } - @Override public LoggingAdapter getLogger() { + @Override public Logger getLogger() { return this.LOG; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 8f33d94700..9b6c08857a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -26,7 +26,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; @@ -129,7 +128,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Upon election: send initial empty AppendEntries RPCs // (heartbeat) to each server; repeat during idle periods to // prevent election timeouts (§5.2) - scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS)); + sendAppendEntries(0); } /** @@ -425,18 +424,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.setCommitIndex(logIndex); applyLogToStateMachine(logIndex); } else { - sendAppendEntries(); + sendAppendEntries(0); } } - private void sendAppendEntries() { + private void sendAppendEntries(long timeSinceLastActivityInterval) { // Send an AppendEntries to all followers - long heartbeatInterval = context.getConfigParams().getHeartBeatInterval().toMillis(); for (Entry e : followerToLog.entrySet()) { final String followerId = e.getKey(); final FollowerLogInformation followerLogInformation = e.getValue(); // This checks helps not to send a repeat message to the follower - if(followerLogInformation.timeSinceLastActivity() >= heartbeatInterval) { + if(!followerLogInformation.isFollowerActive() || + followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) { sendUpdatesToFollower(followerId, followerLogInformation, true); } } @@ -638,7 +637,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToSnapshot.getTotalChunks()); } } catch (IOException e) { - LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId()); + LOG.error("{}: InstallSnapshot failed for Leader.", context.getId(), e); } } @@ -661,7 +660,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendHeartBeat() { if (!followerToLog.isEmpty()) { - sendAppendEntries(); + sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis()); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 99824b0bb4..075b2873e4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.Cancellable; -import akka.event.LoggingAdapter; import java.util.Random; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; @@ -24,6 +23,7 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.slf4j.Logger; import scala.concurrent.duration.FiniteDuration; /** @@ -46,7 +46,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { /** * */ - protected final LoggingAdapter LOG; + protected final Logger LOG; /** * @@ -349,7 +349,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } else { //if one index is not present in the log, no point in looping // around as the rest wont be present either - LOG.warning( + LOG.warn( "{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}", context.getId(), i, i, index); break; @@ -394,7 +394,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { try { close(); } catch (Exception e) { - LOG.error(e, "{}: Failed to close behavior : {}", context.getId(), this.state()); + LOG.error("{}: Failed to close behavior : {}", context.getId(), this.state(), e); } return behavior; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 410b3c266c..8a0788702d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -342,7 +342,7 @@ public class Follower extends AbstractRaftActorBehavior { snapshotTracker = null; } catch (Exception e){ - LOG.error(e, "{}: Exception in InstallSnapshot of follower", context.getId()); + LOG.error("{}: Exception in InstallSnapshot of follower", context.getId(), e); //send reply with success as false. The chunk will be sent again on failure sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), installSnapshot.getChunkIndex(), false), actor()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java index 26fbde0711..d26837f180 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java @@ -8,22 +8,22 @@ package org.opendaylight.controller.cluster.raft.behaviors; -import akka.event.LoggingAdapter; import com.google.common.base.Optional; import com.google.protobuf.ByteString; +import org.slf4j.Logger; /** * SnapshotTracker does house keeping for a snapshot that is being installed in chunks on the Follower */ public class SnapshotTracker { - private final LoggingAdapter LOG; + private final Logger LOG; private final int totalChunks; private ByteString collectedChunks = ByteString.EMPTY; private int lastChunkIndex = AbstractLeader.FIRST_CHUNK_INDEX - 1; private boolean sealed = false; private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; - SnapshotTracker(LoggingAdapter LOG, int totalChunks){ + SnapshotTracker(Logger LOG, int totalChunks){ this.LOG = LOG; this.totalChunks = totalChunks; } @@ -77,6 +77,8 @@ public class SnapshotTracker { } public static class InvalidChunkException extends Exception { + private static final long serialVersionUID = 1L; + InvalidChunkException(String message){ super(message); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 9d3e5dcb12..4d33152b41 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -12,8 +12,6 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; -import akka.event.Logging; -import akka.event.LoggingAdapter; import com.google.common.base.Preconditions; import com.google.protobuf.GeneratedMessage; import java.io.Serializable; @@ -22,6 +20,8 @@ import java.util.Map; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MockRaftActorContext implements RaftActorContext { @@ -88,7 +88,8 @@ public class MockRaftActorContext implements RaftActorContext { public void initReplicatedLog(){ this.replicatedLog = new SimpleReplicatedLog(); - this.replicatedLog.append(new MockReplicatedLogEntry(1, 1, new MockPayload(""))); + this.replicatedLog.append(new MockReplicatedLogEntry(1, 0, new MockPayload("1"))); + this.replicatedLog.append(new MockReplicatedLogEntry(1, 1, new MockPayload("2"))); } @Override public ActorRef actorOf(Props props) { @@ -144,8 +145,8 @@ public class MockRaftActorContext implements RaftActorContext { return this.system; } - @Override public LoggingAdapter getLogger() { - return Logging.getLogger(system, this); + @Override public Logger getLogger() { + return LoggerFactory.getLogger(getClass()); } @Override public Map getPeerAddresses() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index cf7af439e5..9e0e06c70b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -1,5 +1,18 @@ package org.opendaylight.controller.cluster.raft; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; @@ -62,20 +75,6 @@ import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - public class RaftActorTest extends AbstractActorTest { @@ -1173,7 +1172,10 @@ public class RaftActorTest extends AbstractActorTest { // simulate a real snapshot leaderActor.onReceiveCommand(new InitiateInstallSnapshot()); assertEquals(5, leaderActor.getReplicatedLog().size()); - assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ", + leaderActor.getCurrentBehavior().state(),leaderActor.getLeaderId()) + , RaftState.Leader, leaderActor.getCurrentBehavior().state()); + //reply from a slow follower does not initiate a fake snapshot leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 9, 1)); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 666cea69ec..3f551b3a30 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -1,5 +1,8 @@ package org.opendaylight.controller.cluster.raft.behaviors; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; @@ -41,14 +44,16 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages; +import org.slf4j.impl.SimpleLogger; import scala.concurrent.duration.FiniteDuration; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - public class LeaderTest extends AbstractRaftActorBehaviorTest { + static { + // This enables trace logging for the tests. + System.setProperty(SimpleLogger.LOG_KEY_PREFIX + MockRaftActorContext.class.getName(), "trace"); + } + private final ActorRef leaderActor = getSystem().actorOf(Props.create(DoNothingActor.class)); private final ActorRef senderActor = @@ -70,47 +75,50 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { @Test public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() { new JavaTestKit(getSystem()) {{ - new Within(duration("1 seconds")) { @Override protected void run() { - ActorRef followerActor = getTestActor(); MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext(); Map peerAddresses = new HashMap<>(); - peerAddresses.put(followerActor.path().toString(), - followerActor.path().toString()); + String followerId = "follower"; + peerAddresses.put(followerId, followerActor.path().toString()); actorContext.setPeerAddresses(peerAddresses); + long term = 1; + actorContext.getTermInformation().update(term, ""); + Leader leader = new Leader(actorContext); - leader.markFollowerActive(followerActor.path().toString()); - Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), - TimeUnit.MILLISECONDS); - leader.handleMessage(senderActor, new SendHeartBeat()); - final String out = - new ExpectMsg(duration("1 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - @Override - protected String match(Object in) { - Object msg = fromSerializableMessage(in); - if (msg instanceof AppendEntries) { - if (((AppendEntries)msg).getTerm() == 0) { - return "match"; - } - return null; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message + // Leader should send an immediate heartbeat with no entries as follower is inactive. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class); + assertEquals("getTerm", term, appendEntries.getTerm()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm()); + assertEquals("Entries size", 0, appendEntries.getEntries().size()); - assertEquals("match", out); + // The follower would normally reply - simulate that explicitly here. + leader.handleMessage(followerActor, new AppendEntriesReply( + followerId, term, true, lastIndex - 1, term)); + assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive()); + + // Sleep for the heartbeat interval so AppendEntries is sent. + Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams(). + getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); + leader.handleMessage(senderActor, new SendHeartBeat()); + + appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class); + assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm()); + assertEquals("Entries size", 1, appendEntries.getEntries().size()); + assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex()); + assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); } }; }}; @@ -119,55 +127,51 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { @Test public void testHandleReplicateMessageSendAppendEntriesToFollower() { new JavaTestKit(getSystem()) {{ - new Within(duration("1 seconds")) { @Override protected void run() { - ActorRef followerActor = getTestActor(); - MockRaftActorContext actorContext = - (MockRaftActorContext) createActorContext(); + MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext(); Map peerAddresses = new HashMap<>(); - peerAddresses.put(followerActor.path().toString(), - followerActor.path().toString()); + String followerId = "follower"; + peerAddresses.put(followerId, followerActor.path().toString()); actorContext.setPeerAddresses(peerAddresses); + long term = 1; + actorContext.getTermInformation().update(term, ""); + Leader leader = new Leader(actorContext); - leader.markFollowerActive(followerActor.path().toString()); - Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), - TimeUnit.MILLISECONDS); - RaftActorBehavior raftBehavior = leader - .handleMessage(senderActor, new Replicate(null, null, - new MockRaftActorContext.MockReplicatedLogEntry(1, - 100, - new MockRaftActorContext.MockPayload("foo")) - )); + + // Leader will send an immediate heartbeat - ignore it. + expectMsgClass(duration("5 seconds"), AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + followerId, term, true, lastIndex, term)); + assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive()); + + MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo"); + MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( + 1, lastIndex + 1, payload); + actorContext.getReplicatedLog().append(newEntry); + RaftActorBehavior raftBehavior = leader.handleMessage(senderActor, + new Replicate(null, null, newEntry)); // State should not change assertTrue(raftBehavior instanceof Leader); - final String out = - new ExpectMsg(duration("1 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - @Override - protected String match(Object in) { - Object msg = fromSerializableMessage(in); - if (msg instanceof AppendEntries) { - if (((AppendEntries)msg).getTerm() == 0) { - return "match"; - } - return null; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertEquals("match", out); + AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class); + assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm()); + assertEquals("Entries size", 1, appendEntries.getEntries().size()); + assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex()); + assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); + assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData()); } }; }}; @@ -176,7 +180,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { @Test public void testHandleReplicateMessageWhenThereAreNoFollowers() { new JavaTestKit(getSystem()) {{ - new Within(duration("1 seconds")) { @Override protected void run() { @@ -282,7 +285,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { leader.handleMessage(leaderActor, new SendHeartBeat()); - AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching( + AppendEntries aeproto = MessageCollectorActor.getFirstMatching( followerActor, AppendEntries.class); assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " + @@ -297,9 +300,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { leader.handleMessage(senderActor, new SendHeartBeat()); - InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot) - MessageCollectorActor.getFirstMatching(followerActor, - InstallSnapshot.SERIALIZABLE_CLASS); + InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.getFirstMatching(followerActor, + InstallSnapshot.SERIALIZABLE_CLASS); assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot", isproto); @@ -435,7 +437,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { RaftActorBehavior raftBehavior = leader.handleMessage( leaderActor, new InitiateInstallSnapshot()); - CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor. + CaptureSnapshot cs = MessageCollectorActor. getFirstMatching(leaderActor, CaptureSnapshot.class); assertNotNull(cs); @@ -491,6 +493,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { Leader leader = new Leader(actorContext); + // Ignore initial heartbeat. + expectMsgClass(duration("5 seconds"), AppendEntries.class); + // new entry ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm, @@ -558,6 +563,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { MockLeader leader = new MockLeader(actorContext); + // Ignore initial heartbeat. + expectMsgClass(duration("5 seconds"), AppendEntries.class); + Map leadersSnapshot = new HashMap<>(); leadersSnapshot.put("1", "A"); leadersSnapshot.put("2", "B"); @@ -734,15 +742,17 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); - Object o = MessageCollectorActor.getAllMessages(followerActor).get(0); + MessageCollectorActor.getAllMatching(followerActor, + InstallSnapshotMessages.InstallSnapshot.class); - assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot); - - InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o; + InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching( + followerActor, InstallSnapshotMessages.InstallSnapshot.class); + assertNotNull(installSnapshot); assertEquals(1, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); + followerActor.underlyingActor().clear(); leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false)); @@ -752,11 +762,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { leader.handleMessage(leaderActor, new SendHeartBeat()); - o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1); - - assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot); - - installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o; + installSnapshot = MessageCollectorActor.getFirstMatching( + followerActor, InstallSnapshotMessages.InstallSnapshot.class); + assertNotNull(installSnapshot); assertEquals(1, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); @@ -769,7 +777,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception { new JavaTestKit(getSystem()) { { - TestActorRef followerActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk"); @@ -811,11 +818,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); - Object o = MessageCollectorActor.getAllMessages(followerActor).get(0); - - assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot); - - InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o; + InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching( + followerActor, InstallSnapshotMessages.InstallSnapshot.class); + assertNotNull(installSnapshot); assertEquals(1, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); @@ -823,17 +828,13 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { int hashCode = installSnapshot.getData().hashCode(); - leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true )); - - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - - leader.handleMessage(leaderActor, new SendHeartBeat()); + followerActor.underlyingActor().clear(); - o = MessageCollectorActor.getAllMessages(followerActor).get(1); - - assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot); + leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true )); - installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o; + installSnapshot = MessageCollectorActor.getFirstMatching( + followerActor, InstallSnapshotMessages.InstallSnapshot.class); + assertNotNull(installSnapshot); assertEquals(2, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); @@ -902,7 +903,12 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { @Override protected RaftActorContext createActorContext(ActorRef actorRef) { - return new MockRaftActorContext("test", getSystem(), actorRef); + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS)); + configParams.setElectionTimeoutFactor(100000); + MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), actorRef); + context.setConfigParams(configParams); + return context; } private ByteString toByteString(Map state) { @@ -931,43 +937,41 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { } public static class ForwardMessageToBehaviorActor extends MessageCollectorActor { - private static AbstractRaftActorBehavior behavior; - - public ForwardMessageToBehaviorActor(){ - - } + AbstractRaftActorBehavior behavior; @Override public void onReceive(Object message) throws Exception { + if(behavior != null) { + behavior.handleMessage(sender(), message); + } + super.onReceive(message); - behavior.handleMessage(sender(), message); } - public static void setBehavior(AbstractRaftActorBehavior behavior){ - ForwardMessageToBehaviorActor.behavior = behavior; + public static Props props() { + return Props.create(ForwardMessageToBehaviorActor.class); } } @Test public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception { new JavaTestKit(getSystem()) {{ - - ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + TestActorRef leaderActor = TestActorRef.create(getSystem(), + Props.create(ForwardMessageToBehaviorActor.class)); MockRaftActorContext leaderActorContext = - new MockRaftActorContext("leader", getSystem(), leaderActor); + new MockRaftActorContext("leader", getSystem(), leaderActor); - ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class)); + TestActorRef followerActor = TestActorRef.create(getSystem(), + ForwardMessageToBehaviorActor.props()); MockRaftActorContext followerActorContext = - new MockRaftActorContext("follower", getSystem(), followerActor); + new MockRaftActorContext("follower", getSystem(), followerActor); Follower follower = new Follower(followerActorContext); - - ForwardMessageToBehaviorActor.setBehavior(follower); + followerActor.underlyingActor().behavior = follower; Map peerAddresses = new HashMap<>(); - peerAddresses.put(followerActor.path().toString(), - followerActor.path().toString()); + peerAddresses.put("follower", followerActor.path().toString()); leaderActorContext.setPeerAddresses(peerAddresses); @@ -975,7 +979,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { //create 3 entries leaderActorContext.setReplicatedLog( - new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); leaderActorContext.setCommitIndex(1); @@ -983,37 +987,29 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { // follower too has the exact same log entries and has the same commit index followerActorContext.setReplicatedLog( - new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); followerActorContext.setCommitIndex(1); Leader leader = new Leader(leaderActorContext); - leader.markFollowerActive(followerActor.path().toString()); - - Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(), - TimeUnit.MILLISECONDS); - - leader.handleMessage(leaderActor, new SendHeartBeat()); - - AppendEntries appendEntries = (AppendEntries) MessageCollectorActor - .getFirstMatching(followerActor, AppendEntries.class); + AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class); assertNotNull(appendEntries); assertEquals(1, appendEntries.getLeaderCommit()); - assertEquals(1, appendEntries.getEntries().get(0).getIndex()); + assertEquals(0, appendEntries.getEntries().size()); assertEquals(0, appendEntries.getPrevLogIndex()); - AppendEntriesReply appendEntriesReply = - (AppendEntriesReply) MessageCollectorActor.getFirstMatching( + AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching( leaderActor, AppendEntriesReply.class); - assertNotNull(appendEntriesReply); - // follower returns its next index assertEquals(2, appendEntriesReply.getLogLastIndex()); assertEquals(1, appendEntriesReply.getLogLastTerm()); + // follower returns its next index + assertEquals(2, appendEntriesReply.getLogLastIndex()); + assertEquals(1, appendEntriesReply.getLogLastTerm()); }}; } @@ -1021,68 +1017,83 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { @Test public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception { new JavaTestKit(getSystem()) {{ - - ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + TestActorRef leaderActor = TestActorRef.create(getSystem(), + Props.create(ForwardMessageToBehaviorActor.class)); MockRaftActorContext leaderActorContext = - new MockRaftActorContext("leader", getSystem(), leaderActor); + new MockRaftActorContext("leader", getSystem(), leaderActor); - ActorRef followerActor = getSystem().actorOf( - Props.create(ForwardMessageToBehaviorActor.class)); + TestActorRef followerActor = TestActorRef.create(getSystem(), + ForwardMessageToBehaviorActor.props()); MockRaftActorContext followerActorContext = - new MockRaftActorContext("follower", getSystem(), followerActor); + new MockRaftActorContext("follower", getSystem(), followerActor); Follower follower = new Follower(followerActorContext); - - ForwardMessageToBehaviorActor.setBehavior(follower); + followerActor.underlyingActor().behavior = follower; Map peerAddresses = new HashMap<>(); - peerAddresses.put(followerActor.path().toString(), - followerActor.path().toString()); + peerAddresses.put("follower", followerActor.path().toString()); leaderActorContext.setPeerAddresses(peerAddresses); leaderActorContext.getReplicatedLog().removeFrom(0); leaderActorContext.setReplicatedLog( - new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); leaderActorContext.setCommitIndex(1); followerActorContext.getReplicatedLog().removeFrom(0); followerActorContext.setReplicatedLog( - new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); // follower has the same log entries but its commit index > leaders commit index followerActorContext.setCommitIndex(2); Leader leader = new Leader(leaderActorContext); - leader.markFollowerActive(followerActor.path().toString()); - - Thread.sleep(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis()); - - leader.handleMessage(leaderActor, new SendHeartBeat()); - - AppendEntries appendEntries = (AppendEntries) MessageCollectorActor - .getFirstMatching(followerActor, AppendEntries.class); + // Initial heartbeat + AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class); assertNotNull(appendEntries); assertEquals(1, appendEntries.getLeaderCommit()); - assertEquals(1, appendEntries.getEntries().get(0).getIndex()); + assertEquals(0, appendEntries.getEntries().size()); assertEquals(0, appendEntries.getPrevLogIndex()); - AppendEntriesReply appendEntriesReply = - (AppendEntriesReply) MessageCollectorActor.getFirstMatching( + AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching( leaderActor, AppendEntriesReply.class); + assertNotNull(appendEntriesReply); + + assertEquals(2, appendEntriesReply.getLogLastIndex()); + assertEquals(1, appendEntriesReply.getLogLastTerm()); + + leaderActor.underlyingActor().behavior = leader; + leader.handleMessage(followerActor, appendEntriesReply); + + leaderActor.underlyingActor().clear(); + followerActor.underlyingActor().clear(); + + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(), + TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, new SendHeartBeat()); + + appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class); + assertNotNull(appendEntries); + + assertEquals(1, appendEntries.getLeaderCommit()); + assertEquals(0, appendEntries.getEntries().size()); + assertEquals(2, appendEntries.getPrevLogIndex()); + + appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class); assertNotNull(appendEntriesReply); assertEquals(2, appendEntriesReply.getLogLastIndex()); assertEquals(1, appendEntriesReply.getLogLastTerm()); + assertEquals(1, followerActorContext.getCommitIndex()); }}; } @@ -1156,8 +1167,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { assertEquals(2, leaderActorContext.getCommitIndex()); ApplyLogEntries applyLogEntries = - (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor, - ApplyLogEntries.class); + MessageCollectorActor.getFirstMatching(leaderActor, + ApplyLogEntries.class); assertNotNull(applyLogEntries); @@ -1295,78 +1306,91 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { @Test public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception { new JavaTestKit(getSystem()) {{ - - ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + TestActorRef leaderActor = TestActorRef.create(getSystem(), + Props.create(MessageCollectorActor.class)); MockRaftActorContext leaderActorContext = - new MockRaftActorContext("leader", getSystem(), leaderActor); + new MockRaftActorContext("leader", getSystem(), leaderActor); DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); - configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS)); + //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS)); configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); leaderActorContext.setConfigParams(configParams); - ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class)); + TestActorRef followerActor = TestActorRef.create(getSystem(), + ForwardMessageToBehaviorActor.props()); MockRaftActorContext followerActorContext = - new MockRaftActorContext("follower-reply", getSystem(), followerActor); + new MockRaftActorContext("follower-reply", getSystem(), followerActor); followerActorContext.setConfigParams(configParams); Follower follower = new Follower(followerActorContext); - - ForwardMessageToBehaviorActor.setBehavior(follower); + followerActor.underlyingActor().behavior = follower; Map peerAddresses = new HashMap<>(); peerAddresses.put("follower-reply", - followerActor.path().toString()); + followerActor.path().toString()); leaderActorContext.setPeerAddresses(peerAddresses); leaderActorContext.getReplicatedLog().removeFrom(0); + leaderActorContext.setCommitIndex(-1); + leaderActorContext.setLastApplied(-1); - //create 3 entries - leaderActorContext.setReplicatedLog( - new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); - - leaderActorContext.setCommitIndex(1); + followerActorContext.getReplicatedLog().removeFrom(0); + followerActorContext.setCommitIndex(-1); + followerActorContext.setLastApplied(-1); Leader leader = new Leader(leaderActorContext); - leader.markFollowerActive("follower-reply"); + + AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching( + leaderActor, AppendEntriesReply.class); + assertNotNull(appendEntriesReply); + System.out.println("appendEntriesReply: "+appendEntriesReply); + leader.handleMessage(followerActor, appendEntriesReply); + + // Clear initial heartbeat messages + + leaderActor.underlyingActor().clear(); + followerActor.underlyingActor().clear(); + + // create 3 entries + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + leaderActorContext.setCommitIndex(1); + leaderActorContext.setLastApplied(1); Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(), - TimeUnit.MILLISECONDS); + TimeUnit.MILLISECONDS); leader.handleMessage(leaderActor, new SendHeartBeat()); - AppendEntries appendEntries = (AppendEntries) ForwardMessageToBehaviorActor - .getFirstMatching(followerActor, AppendEntries.class); - + AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class); assertNotNull(appendEntries); + // Should send first log entry assertEquals(1, appendEntries.getLeaderCommit()); - assertEquals(1, appendEntries.getEntries().get(0).getIndex()); - assertEquals(0, appendEntries.getPrevLogIndex()); - - AppendEntriesReply appendEntriesReply = - (AppendEntriesReply)ForwardMessageToBehaviorActor.getFirstMatching(leaderActor, AppendEntriesReply.class); + assertEquals(0, appendEntries.getEntries().get(0).getIndex()); + assertEquals(-1, appendEntries.getPrevLogIndex()); + appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class); assertNotNull(appendEntriesReply); - leader.handleAppendEntriesReply(followerActor, appendEntriesReply); - - List entries = ForwardMessageToBehaviorActor - .getAllMatching(followerActor, AppendEntries.class); + assertEquals(1, appendEntriesReply.getLogLastTerm()); + assertEquals(0, appendEntriesReply.getLogLastIndex()); - assertEquals("AppendEntries count should be 2 ", 2, entries.size()); + followerActor.underlyingActor().clear(); - AppendEntries appendEntriesSecond = (AppendEntries) entries.get(1); + leader.handleAppendEntriesReply(followerActor, appendEntriesReply); - assertEquals(1, appendEntriesSecond.getLeaderCommit()); - assertEquals(2, appendEntriesSecond.getEntries().get(0).getIndex()); - assertEquals(1, appendEntriesSecond.getPrevLogIndex()); + appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class); + assertNotNull(appendEntries); + // Should send second log entry + assertEquals(1, appendEntries.getLeaderCommit()); + assertEquals(1, appendEntries.getEntries().get(0).getIndex()); }}; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java index 1b3a8f5fb5..f103abcf84 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java @@ -1,8 +1,6 @@ package org.opendaylight.controller.cluster.raft.behaviors; import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; -import akka.event.LoggingAdapter; import com.google.common.base.Optional; import com.google.protobuf.ByteString; import java.io.ByteArrayOutputStream; @@ -13,9 +11,13 @@ import java.util.Map; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SnapshotTrackerTest { + Logger logger = LoggerFactory.getLogger(getClass()); + Map data; ByteString byteString; ByteString chunk1; @@ -37,14 +39,14 @@ public class SnapshotTrackerTest { @Test public void testAddChunk() throws SnapshotTracker.InvalidChunkException { - SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5); + SnapshotTracker tracker1 = new SnapshotTracker(logger, 5); tracker1.addChunk(1, chunk1, Optional.absent()); tracker1.addChunk(2, chunk2, Optional.absent()); tracker1.addChunk(3, chunk3, Optional.absent()); // Verify that an InvalidChunkException is thrown when we try to add a chunk to a sealed tracker - SnapshotTracker tracker2 = new SnapshotTracker(mock(LoggingAdapter.class), 2); + SnapshotTracker tracker2 = new SnapshotTracker(logger, 2); tracker2.addChunk(1, chunk1, Optional.absent()); tracker2.addChunk(2, chunk2, Optional.absent()); @@ -57,7 +59,7 @@ public class SnapshotTrackerTest { } // The first chunk's index must at least be FIRST_CHUNK_INDEX - SnapshotTracker tracker3 = new SnapshotTracker(mock(LoggingAdapter.class), 2); + SnapshotTracker tracker3 = new SnapshotTracker(logger, 2); try { tracker3.addChunk(AbstractLeader.FIRST_CHUNK_INDEX - 1, chunk1, Optional.absent()); @@ -67,7 +69,7 @@ public class SnapshotTrackerTest { } // Out of sequence chunk indexes won't work - SnapshotTracker tracker4 = new SnapshotTracker(mock(LoggingAdapter.class), 2); + SnapshotTracker tracker4 = new SnapshotTracker(logger, 2); tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.absent()); @@ -80,7 +82,7 @@ public class SnapshotTrackerTest { // No exceptions will be thrown when invalid chunk is added with the right sequence // If the lastChunkHashCode is missing - SnapshotTracker tracker5 = new SnapshotTracker(mock(LoggingAdapter.class), 2); + SnapshotTracker tracker5 = new SnapshotTracker(logger, 2); tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.absent()); // Look I can add the same chunk again @@ -88,7 +90,7 @@ public class SnapshotTrackerTest { // An exception will be thrown when an invalid chunk is addedd with the right sequence // when the lastChunkHashCode is present - SnapshotTracker tracker6 = new SnapshotTracker(mock(LoggingAdapter.class), 2); + SnapshotTracker tracker6 = new SnapshotTracker(logger, 2); tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.of(-1)); @@ -106,7 +108,7 @@ public class SnapshotTrackerTest { public void testGetSnapShot() throws SnapshotTracker.InvalidChunkException { // Trying to get a snapshot before all chunks have been received will throw an exception - SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5); + SnapshotTracker tracker1 = new SnapshotTracker(logger, 5); tracker1.addChunk(1, chunk1, Optional.absent()); try { @@ -116,7 +118,7 @@ public class SnapshotTrackerTest { } - SnapshotTracker tracker2 = new SnapshotTracker(mock(LoggingAdapter.class), 3); + SnapshotTracker tracker2 = new SnapshotTracker(logger, 3); tracker2.addChunk(1, chunk1, Optional.absent()); tracker2.addChunk(2, chunk2, Optional.absent()); @@ -129,7 +131,7 @@ public class SnapshotTrackerTest { @Test public void testGetCollectedChunks() throws SnapshotTracker.InvalidChunkException { - SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5); + SnapshotTracker tracker1 = new SnapshotTracker(logger, 5); ByteString chunks = chunk1.concat(chunk2); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java index 3469a956c3..c5acb1f2a4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java @@ -13,6 +13,7 @@ import akka.actor.UntypedActor; import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Uninterruptibles; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; @@ -23,7 +24,7 @@ import scala.concurrent.duration.FiniteDuration; public class MessageCollectorActor extends UntypedActor { - private List messages = new ArrayList<>(); + private final List messages = new ArrayList<>(); @Override public void onReceive(Object message) throws Exception { if(message instanceof String){ @@ -35,6 +36,10 @@ public class MessageCollectorActor extends UntypedActor { } } + public void clear() { + messages.clear(); + } + public static List getAllMessages(ActorRef actor) throws Exception { FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS); Timeout operationTimeout = new Timeout(operationDuration); @@ -53,13 +58,17 @@ public class MessageCollectorActor extends UntypedActor { * @param clazz * @return */ - public static Object getFirstMatching(ActorRef actor, Class clazz) throws Exception { - List allMessages = getAllMessages(actor); + public static T getFirstMatching(ActorRef actor, Class clazz) throws Exception { + for(int i = 0; i < 50; i++) { + List allMessages = getAllMessages(actor); - for(Object message : allMessages){ - if(message.getClass().equals(clazz)){ - return message; + for(Object message : allMessages){ + if(message.getClass().equals(clazz)){ + return (T) message; + } } + + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); } return null; diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java index 21a0cb6a88..a604b05c01 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java @@ -9,12 +9,11 @@ package org.opendaylight.controller.cluster.common.actor; import akka.actor.UntypedActor; -import akka.event.Logging; -import akka.event.LoggingAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class AbstractUntypedActor extends UntypedActor { - protected final LoggingAdapter LOG = - Logging.getLogger(getContext().system(), this); + protected final Logger LOG = LoggerFactory.getLogger(getClass()); public AbstractUntypedActor() { if(LOG.isDebugEnabled()) { diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java index 8a6217deab..95ee21674a 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java @@ -8,17 +8,16 @@ package org.opendaylight.controller.cluster.common.actor; -import akka.event.Logging; -import akka.event.LoggingAdapter; import akka.japi.Procedure; import akka.persistence.SnapshotSelectionCriteria; import akka.persistence.UntypedPersistentActor; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class AbstractUntypedPersistentActor extends UntypedPersistentActor { - protected final LoggingAdapter LOG = - Logging.getLogger(getContext().system(), this); + protected final Logger LOG = LoggerFactory.getLogger(getClass()); public AbstractUntypedPersistentActor() { if(LOG.isDebugEnabled()) { @@ -119,7 +118,7 @@ public abstract class AbstractUntypedPersistentActor extends UntypedPersistentAc try { procedure.apply(o); } catch (Exception e) { - LOG.error(e, "An unexpected error occurred"); + LOG.error("An unexpected error occurred", e); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/pom.xml b/opendaylight/md-sal/sal-distributed-datastore/pom.xml index d6030ea457..e27546f5dc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/pom.xml +++ b/opendaylight/md-sal/sal-distributed-datastore/pom.xml @@ -148,6 +148,11 @@ org.opendaylight.yangtools yang-data-impl + + org.apache.commons + commons-lang3 + + diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index 01e42dbb8e..cee781fb88 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.util.Timeout; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.text.WordUtils; import org.opendaylight.controller.cluster.datastore.config.ConfigurationReader; import org.opendaylight.controller.cluster.datastore.config.FileConfigurationReader; import org.opendaylight.controller.cluster.raft.ConfigParams; @@ -25,37 +26,43 @@ import scala.concurrent.duration.FiniteDuration; */ public class DatastoreContext { - private final InMemoryDOMDataStoreConfigProperties dataStoreProperties; - private final Duration shardTransactionIdleTimeout; - private final int operationTimeoutInSeconds; - private final String dataStoreMXBeanType; - private final ConfigParams shardRaftConfig; - private final int shardTransactionCommitTimeoutInSeconds; - private final int shardTransactionCommitQueueCapacity; - private final Timeout shardInitializationTimeout; - private final Timeout shardLeaderElectionTimeout; - private final boolean persistent; - private final ConfigurationReader configurationReader; - private final long shardElectionTimeoutFactor; - - private DatastoreContext(InMemoryDOMDataStoreConfigProperties dataStoreProperties, - ConfigParams shardRaftConfig, String dataStoreMXBeanType, int operationTimeoutInSeconds, - Duration shardTransactionIdleTimeout, int shardTransactionCommitTimeoutInSeconds, - int shardTransactionCommitQueueCapacity, Timeout shardInitializationTimeout, - Timeout shardLeaderElectionTimeout, - boolean persistent, ConfigurationReader configurationReader, long shardElectionTimeoutFactor) { - this.dataStoreProperties = dataStoreProperties; - this.shardRaftConfig = shardRaftConfig; - this.dataStoreMXBeanType = dataStoreMXBeanType; - this.operationTimeoutInSeconds = operationTimeoutInSeconds; - this.shardTransactionIdleTimeout = shardTransactionIdleTimeout; - this.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds; - this.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity; - this.shardInitializationTimeout = shardInitializationTimeout; - this.shardLeaderElectionTimeout = shardLeaderElectionTimeout; - this.persistent = persistent; - this.configurationReader = configurationReader; - this.shardElectionTimeoutFactor = shardElectionTimeoutFactor; + public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES); + public static final int DEFAULT_OPERATION_TIMEOUT_IN_SECONDS = 5; + public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30; + public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1000; + public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000; + public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500; + public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS = DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10; + public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 20000; + public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES); + public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS); + public static final boolean DEFAULT_PERSISTENT = true; + public static final FileConfigurationReader DEFAULT_CONFIGURATION_READER = new FileConfigurationReader(); + public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12; + public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2; + public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100; + public static final String UNKNOWN_DATA_STORE_TYPE = "unknown"; + + private InMemoryDOMDataStoreConfigProperties dataStoreProperties; + private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT; + private int operationTimeoutInSeconds = DEFAULT_OPERATION_TIMEOUT_IN_SECONDS; + private String dataStoreMXBeanType; + private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS; + private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY; + private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT; + private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT; + private boolean persistent = DEFAULT_PERSISTENT; + private ConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER; + private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT; + private DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl(); + private String dataStoreType = UNKNOWN_DATA_STORE_TYPE; + + private DatastoreContext(){ + setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE); + setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT); + setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS); + setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS); + setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE); } public static Builder newBuilder() { @@ -79,7 +86,7 @@ public class DatastoreContext { } public ConfigParams getShardRaftConfig() { - return shardRaftConfig; + return raftConfig; } public int getShardTransactionCommitTimeoutInSeconds() { @@ -107,125 +114,140 @@ public class DatastoreContext { } public long getShardElectionTimeoutFactor(){ - return this.shardElectionTimeoutFactor; + return raftConfig.getElectionTimeoutFactor(); + } + + public String getDataStoreType(){ + return dataStoreType; + } + + public long getTransactionCreationInitialRateLimit() { + return transactionCreationInitialRateLimit; + } + + private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis){ + raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis, + TimeUnit.MILLISECONDS)); + } + + private void setShardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize){ + raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize); + } + + + private void setIsolatedLeaderCheckInterval(long shardIsolatedLeaderCheckIntervalInMillis) { + raftConfig.setIsolatedLeaderCheckInterval( + new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS)); + } + + private void setElectionTimeoutFactor(long shardElectionTimeoutFactor) { + raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor); + } + + private void setSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) { + raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage); + } + + private void setSnapshotBatchCount(int shardSnapshotBatchCount) { + raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount); } public static class Builder { - private InMemoryDOMDataStoreConfigProperties dataStoreProperties; - private Duration shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES); - private int operationTimeoutInSeconds = 5; - private String dataStoreMXBeanType; - private int shardTransactionCommitTimeoutInSeconds = 30; - private int shardJournalRecoveryLogBatchSize = 1000; - private int shardSnapshotBatchCount = 20000; - private int shardHeartbeatIntervalInMillis = 500; - private int shardTransactionCommitQueueCapacity = 20000; - private Timeout shardInitializationTimeout = new Timeout(5, TimeUnit.MINUTES); - private Timeout shardLeaderElectionTimeout = new Timeout(30, TimeUnit.SECONDS); - private boolean persistent = true; - private ConfigurationReader configurationReader = new FileConfigurationReader(); - private int shardIsolatedLeaderCheckIntervalInMillis = shardHeartbeatIntervalInMillis * 10; - private int shardSnapshotDataThresholdPercentage = 12; - private long shardElectionTimeoutFactor = 2; + private DatastoreContext datastoreContext = new DatastoreContext(); public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) { - this.shardTransactionIdleTimeout = shardTransactionIdleTimeout; + datastoreContext.shardTransactionIdleTimeout = shardTransactionIdleTimeout; return this; } public Builder operationTimeoutInSeconds(int operationTimeoutInSeconds) { - this.operationTimeoutInSeconds = operationTimeoutInSeconds; + datastoreContext.operationTimeoutInSeconds = operationTimeoutInSeconds; return this; } public Builder dataStoreMXBeanType(String dataStoreMXBeanType) { - this.dataStoreMXBeanType = dataStoreMXBeanType; + datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType; return this; } public Builder dataStoreProperties(InMemoryDOMDataStoreConfigProperties dataStoreProperties) { - this.dataStoreProperties = dataStoreProperties; + datastoreContext.dataStoreProperties = dataStoreProperties; return this; } public Builder shardTransactionCommitTimeoutInSeconds(int shardTransactionCommitTimeoutInSeconds) { - this.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds; + datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds; return this; } public Builder shardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) { - this.shardJournalRecoveryLogBatchSize = shardJournalRecoveryLogBatchSize; + datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize); return this; } public Builder shardSnapshotBatchCount(int shardSnapshotBatchCount) { - this.shardSnapshotBatchCount = shardSnapshotBatchCount; + datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount); return this; } public Builder shardSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) { - this.shardSnapshotDataThresholdPercentage = shardSnapshotDataThresholdPercentage; + datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage); return this; } - public Builder shardHeartbeatIntervalInMillis(int shardHeartbeatIntervalInMillis) { - this.shardHeartbeatIntervalInMillis = shardHeartbeatIntervalInMillis; + datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis); return this; } public Builder shardTransactionCommitQueueCapacity(int shardTransactionCommitQueueCapacity) { - this.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity; + datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity; return this; } public Builder shardInitializationTimeout(long timeout, TimeUnit unit) { - this.shardInitializationTimeout = new Timeout(timeout, unit); + datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit); return this; } public Builder shardLeaderElectionTimeout(long timeout, TimeUnit unit) { - this.shardLeaderElectionTimeout = new Timeout(timeout, unit); + datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit); return this; } public Builder configurationReader(ConfigurationReader configurationReader){ - this.configurationReader = configurationReader; + datastoreContext.configurationReader = configurationReader; return this; } public Builder persistent(boolean persistent){ - this.persistent = persistent; + datastoreContext.persistent = persistent; return this; } public Builder shardIsolatedLeaderCheckIntervalInMillis(int shardIsolatedLeaderCheckIntervalInMillis) { - this.shardIsolatedLeaderCheckIntervalInMillis = shardIsolatedLeaderCheckIntervalInMillis; + datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis); return this; } public Builder shardElectionTimeoutFactor(long shardElectionTimeoutFactor){ - this.shardElectionTimeoutFactor = shardElectionTimeoutFactor; + datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor); return this; } + public Builder transactionCreationInitialRateLimit(long initialRateLimit){ + datastoreContext.transactionCreationInitialRateLimit = initialRateLimit; + return this; + } - public DatastoreContext build() { - DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl(); - raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis, - TimeUnit.MILLISECONDS)); - raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize); - raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount); - raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage); - raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor); - raftConfig.setIsolatedLeaderCheckInterval( - new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS)); + public Builder dataStoreType(String dataStoreType){ + datastoreContext.dataStoreType = dataStoreType; + datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreType) + "Datastore"; + return this; + } - return new DatastoreContext(dataStoreProperties, raftConfig, dataStoreMXBeanType, - operationTimeoutInSeconds, shardTransactionIdleTimeout, - shardTransactionCommitTimeoutInSeconds, shardTransactionCommitQueueCapacity, - shardInitializationTimeout, shardLeaderElectionTimeout, - persistent, configurationReader, shardElectionTimeoutFactor); + public DatastoreContext build() { + return datastoreContext; } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 930c5f7257..107c959112 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -39,20 +39,21 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au private final ActorContext actorContext; - public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster, + public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext) { Preconditions.checkNotNull(actorSystem, "actorSystem should not be null"); - Preconditions.checkNotNull(type, "type should not be null"); Preconditions.checkNotNull(cluster, "cluster should not be null"); Preconditions.checkNotNull(configuration, "configuration should not be null"); Preconditions.checkNotNull(datastoreContext, "datastoreContext should not be null"); + String type = datastoreContext.getDataStoreType(); + String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString(); LOG.info("Creating ShardManager : {}", shardManagerId); actorContext = new ActorContext(actorSystem, actorSystem.actorOf( - ShardManager.props(type, cluster, configuration, datastoreContext) + ShardManager.props(cluster, configuration, datastoreContext) .withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration, datastoreContext); } @@ -94,11 +95,13 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { + actorContext.acquireTxCreationPermit(); return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { + actorContext.acquireTxCreationPermit(); return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java index 5d63c92e88..a9a735ede7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java @@ -22,13 +22,13 @@ public class DistributedDataStoreFactory { private static volatile ActorSystem persistentActorSystem = null; - public static DistributedDataStore createInstance(String name, SchemaService schemaService, + public static DistributedDataStore createInstance(SchemaService schemaService, DatastoreContext datastoreContext, BundleContext bundleContext) { ActorSystem actorSystem = getOrCreateInstance(bundleContext, datastoreContext.getConfigurationReader()); Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf"); final DistributedDataStore dataStore = - new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem), + new DistributedDataStore(actorSystem, new ClusterWrapperImpl(actorSystem), config, datastoreContext); ShardStrategyFactory.setConfiguration(config); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 744e2c22c6..87a0fb931e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -12,8 +12,6 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; import akka.actor.Props; -import akka.event.Logging; -import akka.event.LoggingAdapter; import akka.japi.Creator; import akka.persistence.RecoveryFailure; import akka.serialization.Serialization; @@ -101,8 +99,6 @@ public class Shard extends RaftActor { // The state of this Shard private final InMemoryDOMDataStore store; - private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this); - /// The name of this shard private final ShardIdentifier name; @@ -220,8 +216,8 @@ public class Shard extends RaftActor { } if (message instanceof RecoveryFailure){ - LOG.error(((RecoveryFailure) message).cause(), "{}: Recovery failed because of this cause", - persistenceId()); + LOG.error("{}: Recovery failed because of this cause", + persistenceId(), ((RecoveryFailure) message).cause()); // Even though recovery failed, we still need to finish our recovery, eg send the // ActorInitialized message and start the txCommitTimeoutCheckSchedule. @@ -274,7 +270,7 @@ public class Shard extends RaftActor { if(cohortEntry != null) { long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime(); if(elapsed > transactionCommitTimeout) { - LOG.warning("{}: Current transaction {} has timed out after {} ms - aborting", + LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting", persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout); doAbortTransaction(cohortEntry.getTransactionID(), null); @@ -322,8 +318,8 @@ public class Shard extends RaftActor { new ModificationPayload(cohortEntry.getModification())); } } catch (Exception e) { - LOG.error(e, "{} An exception occurred while preCommitting transaction {}", - persistenceId(), cohortEntry.getTransactionID()); + LOG.error("{} An exception occurred while preCommitting transaction {}", + persistenceId(), cohortEntry.getTransactionID(), e); shardMBean.incrementFailedTransactionsCount(); getSender().tell(new akka.actor.Status.Failure(e), getSelf()); } @@ -376,7 +372,8 @@ public class Shard extends RaftActor { } catch (Exception e) { sender.tell(new akka.actor.Status.Failure(e), getSelf()); - LOG.error(e, "{}, An exception occurred while committing transaction {}", persistenceId(), transactionID); + LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(), + transactionID, e); shardMBean.incrementFailedTransactionsCount(); } finally { commitCoordinator.currentTransactionComplete(transactionID, true); @@ -445,7 +442,7 @@ public class Shard extends RaftActor { @Override public void onFailure(final Throwable t) { - LOG.error(t, "{}: An exception happened during abort", persistenceId()); + LOG.error("{}: An exception happened during abort", persistenceId(), t); if(sender != null) { sender.tell(new akka.actor.Status.Failure(t), self); @@ -580,7 +577,7 @@ public class Shard extends RaftActor { shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis()); } catch (InterruptedException | ExecutionException e) { shardMBean.incrementFailedTransactionsCount(); - LOG.error(e, "{}: Failed to commit", persistenceId()); + LOG.error("{}: Failed to commit", persistenceId(), e); } } @@ -667,7 +664,7 @@ public class Shard extends RaftActor { try { currentLogRecoveryBatch.add(((ModificationPayload) data).getModification()); } catch (ClassNotFoundException | IOException e) { - LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId()); + LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e); } } else if (data instanceof CompositeModificationPayload) { currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification()); @@ -722,7 +719,7 @@ public class Shard extends RaftActor { shardMBean.incrementCommittedTransactionCount(); } catch (InterruptedException | ExecutionException e) { shardMBean.incrementFailedTransactionsCount(); - LOG.error(e, "{}: Failed to commit", persistenceId()); + LOG.error("{}: Failed to commit", persistenceId(), e); } } } @@ -752,7 +749,7 @@ public class Shard extends RaftActor { try { applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification()); } catch (ClassNotFoundException | IOException e) { - LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId()); + LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e); } } else if (data instanceof CompositeModificationPayload) { @@ -835,7 +832,7 @@ public class Shard extends RaftActor { transaction.write(DATASTORE_ROOT, node); syncCommitTransaction(transaction); } catch (InterruptedException | ExecutionException e) { - LOG.error(e, "{}: An exception occurred when applying snapshot", persistenceId()); + LOG.error("{}: An exception occurred when applying snapshot", persistenceId(), e); } finally { LOG.info("{}: Done applying snapshot", persistenceId()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 165e272d8b..8b95404c4e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -9,7 +9,6 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.Status; -import akka.event.LoggingAdapter; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import java.util.LinkedList; @@ -20,6 +19,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.slf4j.Logger; /** * Coordinates commits for a shard ensuring only one concurrent 3-phase commit. @@ -36,11 +36,11 @@ public class ShardCommitCoordinator { private final int queueCapacity; - private final LoggingAdapter log; + private final Logger log; private final String name; - public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, LoggingAdapter log, + public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, Logger log, String name) { cohortCache = CacheBuilder.newBuilder().expireAfterAccess( cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 22e2dbd47d..3dbac003b9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -15,8 +15,6 @@ import akka.actor.OneForOneStrategy; import akka.actor.Props; import akka.actor.SupervisorStrategy; import akka.cluster.ClusterEvent; -import akka.event.Logging; -import akka.event.LoggingAdapter; import akka.japi.Creator; import akka.japi.Function; import akka.japi.Procedure; @@ -54,6 +52,8 @@ import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; /** @@ -67,8 +67,7 @@ import scala.concurrent.duration.Duration; */ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { - protected final LoggingAdapter LOG = - Logging.getLogger(getContext().system(), this); + private final Logger LOG = LoggerFactory.getLogger(getClass()); // Stores a mapping between a member name and the address of the member // Member names look like "member-1", "member-2" etc and are as specified @@ -97,17 +96,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final DataPersistenceProvider dataPersistenceProvider; /** - * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be - * configuration or operational */ - protected ShardManager(String type, ClusterWrapper cluster, Configuration configuration, + protected ShardManager(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext) { - this.type = Preconditions.checkNotNull(type, "type should not be null"); this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null"); this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null"); this.datastoreContext = datastoreContext; this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent()); + this.type = datastoreContext.getDataStoreType(); // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); @@ -119,16 +116,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider(); } - public static Props props(final String type, + public static Props props( final ClusterWrapper cluster, final Configuration configuration, final DatastoreContext datastoreContext) { - Preconditions.checkNotNull(type, "type should not be null"); Preconditions.checkNotNull(cluster, "cluster should not be null"); Preconditions.checkNotNull(configuration, "configuration should not be null"); - return Props.create(new ShardManagerCreator(type, cluster, configuration, datastoreContext)); + return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext)); } @Override @@ -186,7 +182,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { knownModules = ImmutableSet.copyOf(msg.getModules()); } else if (message instanceof RecoveryFailure) { RecoveryFailure failure = (RecoveryFailure) message; - LOG.error(failure.cause(), "Recovery failed"); + LOG.error("Recovery failed", failure.cause()); } else if (message instanceof RecoveryCompleted) { LOG.info("Recovery complete : {}", persistenceId()); @@ -424,12 +420,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { new Function() { @Override public SupervisorStrategy.Directive apply(Throwable t) { - StringBuilder sb = new StringBuilder(); - for(StackTraceElement element : t.getStackTrace()) { - sb.append("\n\tat ") - .append(element.toString()); - } - LOG.warning("Supervisor Strategy of resume applied {}",sb.toString()); + LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t); return SupervisorStrategy.resume(); } } @@ -535,14 +526,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private static class ShardManagerCreator implements Creator { private static final long serialVersionUID = 1L; - final String type; final ClusterWrapper cluster; final Configuration configuration; final DatastoreContext datastoreContext; - ShardManagerCreator(String type, ClusterWrapper cluster, + ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext) { - this.type = type; this.cluster = cluster; this.configuration = configuration; this.datastoreContext = datastoreContext; @@ -550,7 +539,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public ShardManager create() throws Exception { - return new ShardManager(type, cluster, configuration, datastoreContext); + return new ShardManager(cluster, configuration, datastoreContext); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java index 2a97036883..50528575e7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java @@ -7,7 +7,6 @@ */ package org.opendaylight.controller.cluster.datastore; -import akka.event.LoggingAdapter; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.Collection; @@ -22,6 +21,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; /** * Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot @@ -40,10 +40,10 @@ class ShardRecoveryCoordinator { private final SchemaContext schemaContext; private final String shardName; private final ExecutorService executor; - private final LoggingAdapter log; + private final Logger log; private final String name; - ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, LoggingAdapter log, + ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, Logger log, String name) { this.schemaContext = schemaContext; this.shardName = shardName; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java index 0c3d33a78c..6dd0ab1230 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java @@ -10,16 +10,15 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.Terminated; import akka.actor.UntypedActor; -import akka.event.Logging; -import akka.event.LoggingAdapter; import org.opendaylight.controller.cluster.datastore.messages.Monitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TerminationMonitor extends UntypedActor{ - protected final LoggingAdapter LOG = - Logging.getLogger(getContext().system(), this); + private static final Logger LOG = LoggerFactory.getLogger(TerminationMonitor.class); public TerminationMonitor(){ - LOG.info("Created TerminationMonitor"); + LOG.debug("Created TerminationMonitor"); } @Override public void onReceive(Object message) throws Exception { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index 932c36fe34..4f472266c1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -11,12 +11,15 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; import akka.dispatch.Futures; import akka.dispatch.OnComplete; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; @@ -44,6 +47,19 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho private final List> cohortFutures; private volatile List cohorts; private final String transactionId; + private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() { + @Override + public void run() { + } + + @Override + public void success() { + } + + @Override + public void failure() { + } + }; public ThreePhaseCommitCohortProxy(ActorContext actorContext, List> cohortFutures, String transactionId) { @@ -151,8 +167,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho if(LOG.isDebugEnabled()) { LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort); } - - futureList.add(actorContext.executeOperationAsync(cohort, message)); + futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout())); } return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher()); @@ -179,12 +194,20 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho @Override public ListenableFuture commit() { - return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(), - CommitTransactionReply.SERIALIZABLE_CLASS, true); + OperationCallback operationCallback = (cohortFutures.size() == 0) ? NO_OP_CALLBACK : + new CommitCallback(actorContext); + + return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(), + CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback); + } + + private ListenableFuture voidOperation(final String operationName, final Object message, + final Class expectedResponseClass, final boolean propagateException) { + return voidOperation(operationName, message, expectedResponseClass, propagateException, NO_OP_CALLBACK); } private ListenableFuture voidOperation(final String operationName, final Object message, - final Class expectedResponseClass, final boolean propagateException) { + final Class expectedResponseClass, final boolean propagateException, final OperationCallback callback) { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} {}", transactionId, operationName); @@ -196,7 +219,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho if(cohorts != null) { finishVoidOperation(operationName, message, expectedResponseClass, propagateException, - returnFuture); + returnFuture, callback); } else { buildCohortList().onComplete(new OnComplete() { @Override @@ -213,7 +236,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho } } else { finishVoidOperation(operationName, message, expectedResponseClass, - propagateException, returnFuture); + propagateException, returnFuture, callback); } } }, actorContext.getActorSystem().dispatcher()); @@ -223,11 +246,14 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho } private void finishVoidOperation(final String operationName, final Object message, - final Class expectedResponseClass, final boolean propagateException, - final SettableFuture returnFuture) { + final Class expectedResponseClass, final boolean propagateException, + final SettableFuture returnFuture, final OperationCallback callback) { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} finish {}", transactionId, operationName); } + + callback.run(); + Future> combinedFuture = invokeCohorts(message); combinedFuture.onComplete(new OnComplete>() { @@ -247,6 +273,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho } if(exceptionToPropagate != null) { + if(LOG.isDebugEnabled()) { LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId, operationName, exceptionToPropagate); @@ -265,11 +292,16 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho } returnFuture.set(null); } + + callback.failure(); } else { + if(LOG.isDebugEnabled()) { LOG.debug("Tx {}: {} succeeded", transactionId, operationName); } returnFuture.set(null); + + callback.success(); } } }, actorContext.getActorSystem().dispatcher()); @@ -279,4 +311,58 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho List> getCohortFutures() { return Collections.unmodifiableList(cohortFutures); } + + private static interface OperationCallback { + void run(); + void success(); + void failure(); + } + + private static class CommitCallback implements OperationCallback{ + + private static final Logger LOG = LoggerFactory.getLogger(CommitCallback.class); + private static final String COMMIT = "commit"; + + private final Timer commitTimer; + private final ActorContext actorContext; + private Timer.Context timerContext; + + CommitCallback(ActorContext actorContext){ + this.actorContext = actorContext; + commitTimer = actorContext.getOperationTimer(COMMIT); + } + + @Override + public void run() { + timerContext = commitTimer.time(); + } + + @Override + public void success() { + timerContext.stop(); + + Snapshot timerSnapshot = commitTimer.getSnapshot(); + double allowedLatencyInNanos = timerSnapshot.get98thPercentile(); + + long commitTimeoutInSeconds = actorContext.getDatastoreContext() + .getShardTransactionCommitTimeoutInSeconds(); + long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds); + + // Here we are trying to find out how many transactions per second are allowed + double newRateLimit = ((double) commitTimeoutInNanos / allowedLatencyInNanos) / commitTimeoutInSeconds; + + LOG.debug("Data Store {} commit rateLimit adjusted to {} allowedLatencyInNanos = {}", + actorContext.getDataStoreType(), newRateLimit, allowedLatencyInNanos); + + actorContext.setTxCreationLimit(newRateLimit); + } + + @Override + public void failure() { + // This would mean we couldn't get a transaction completed in 30 seconds which is + // the default transaction commit timeout. Using the timeout information to figure out the rate limit is + // not going to be useful - so we leave it as it is + } + } + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index 87959efe8a..ee3a5cc825 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -104,11 +104,13 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { + actorContext.acquireTxCreationPermit(); return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { + actorContext.acquireTxCreationPermit(); return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/BackwardsCompatibleThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/BackwardsCompatibleThreePhaseCommitCohort.java index 30ab97ceb1..f05ef91fc5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/BackwardsCompatibleThreePhaseCommitCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/BackwardsCompatibleThreePhaseCommitCohort.java @@ -7,17 +7,17 @@ */ package org.opendaylight.controller.cluster.datastore.compat; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.japi.Creator; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply; -import akka.actor.PoisonPill; -import akka.actor.Props; -import akka.event.Logging; -import akka.event.LoggingAdapter; -import akka.japi.Creator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An actor to maintain backwards compatibility for the base Helium version where the 3-phase commit @@ -28,7 +28,7 @@ import akka.japi.Creator; */ public class BackwardsCompatibleThreePhaseCommitCohort extends AbstractUntypedActor { - private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this); + private static final Logger LOG = LoggerFactory.getLogger(BackwardsCompatibleThreePhaseCommitCohort.class); private final String transactionId; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index c9fdf38931..cb06c898fd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -18,9 +18,13 @@ import akka.actor.PoisonPill; import akka.dispatch.Mapper; import akka.pattern.AskTimeoutException; import akka.util.Timeout; +import com.codahale.metrics.JmxReporter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; @@ -54,11 +58,11 @@ import scala.concurrent.duration.FiniteDuration; * but should not be passed to actors especially remote actors */ public class ActorContext { - private static final Logger - LOG = LoggerFactory.getLogger(ActorContext.class); - - public static final String MAILBOX = "bounded-mailbox"; - + private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class); + private static final String UNKNOWN_DATA_STORE_TYPE = "unknown"; + private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store"; + private static final String METRIC_RATE = "rate"; + private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore"; private static final Mapper FIND_PRIMARY_FAILURE_TRANSFORMER = new Mapper() { @Override @@ -74,17 +78,23 @@ public class ActorContext { return actualFailure; } }; + public static final String MAILBOX = "bounded-mailbox"; private final ActorSystem actorSystem; private final ActorRef shardManager; private final ClusterWrapper clusterWrapper; private final Configuration configuration; private final DatastoreContext datastoreContext; - private volatile SchemaContext schemaContext; private final FiniteDuration operationDuration; private final Timeout operationTimeout; private final String selfAddressHostPort; + private final RateLimiter txRateLimiter; + private final MetricRegistry metricRegistry = new MetricRegistry(); + private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build(); private final int transactionOutstandingOperationLimit; + private final Timeout transactionCommitOperationTimeout; + + private volatile SchemaContext schemaContext; public ActorContext(ActorSystem actorSystem, ActorRef shardManager, ClusterWrapper clusterWrapper, Configuration configuration) { @@ -100,10 +110,13 @@ public class ActorContext { this.clusterWrapper = clusterWrapper; this.configuration = configuration; this.datastoreContext = datastoreContext; + this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit()); - operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), - TimeUnit.SECONDS); + operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS); operationTimeout = new Timeout(operationDuration); + transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(), + TimeUnit.SECONDS)); + Address selfAddress = clusterWrapper.getSelfAddress(); if (selfAddress != null && !selfAddress.host().isEmpty()) { @@ -113,6 +126,7 @@ public class ActorContext { } transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity(); + jmxReporter.start(); } public DatastoreContext getDatastoreContext() { @@ -446,4 +460,59 @@ public class ActorContext { public int getTransactionOutstandingOperationLimit(){ return transactionOutstandingOperationLimit; } + + /** + * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow + * us to create a timer for pretty much anything. + * + * @param operationName + * @return + */ + public Timer getOperationTimer(String operationName){ + final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, datastoreContext.getDataStoreType(), operationName, METRIC_RATE); + return metricRegistry.timer(rate); + } + + /** + * Get the type of the data store to which this ActorContext belongs + * + * @return + */ + public String getDataStoreType() { + return datastoreContext.getDataStoreType(); + } + + /** + * Set the number of transaction creation permits that are to be allowed + * + * @param permitsPerSecond + */ + public void setTxCreationLimit(double permitsPerSecond){ + txRateLimiter.setRate(permitsPerSecond); + } + + /** + * Get the current transaction creation rate limit + * @return + */ + public double getTxCreationLimit(){ + return txRateLimiter.getRate(); + } + + /** + * Try to acquire a transaction creation permit. Will block if no permits are available. + */ + public void acquireTxCreationPermit(){ + txRateLimiter.acquire(); + } + + /** + * Return the operation timeout to be used when committing transactions + * @return + */ + public Timeout getTransactionCommitOperationTimeout(){ + return transactionCommitOperationTimeout; + } + + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java index 711c6a37b5..7e8307465b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java @@ -41,7 +41,7 @@ public class DistributedConfigDataStoreProviderModule extends } DatastoreContext datastoreContext = DatastoreContext.newBuilder() - .dataStoreMXBeanType("DistributedConfigDatastore") + .dataStoreType("config") .dataStoreProperties(InMemoryDOMDataStoreConfigProperties.create( props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(), props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(), @@ -67,9 +67,10 @@ public class DistributedConfigDataStoreProviderModule extends .shardIsolatedLeaderCheckIntervalInMillis( props.getShardIsolatedLeaderCheckIntervalInMillis().getValue()) .shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue()) + .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue()) .build(); - return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(), + return DistributedDataStoreFactory.createInstance(getConfigSchemaServiceDependency(), datastoreContext, bundleContext); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java index d9df06df1c..0655468531 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java @@ -41,7 +41,7 @@ public class DistributedOperationalDataStoreProviderModule extends } DatastoreContext datastoreContext = DatastoreContext.newBuilder() - .dataStoreMXBeanType("DistributedOperationalDatastore") + .dataStoreType("operational") .dataStoreProperties(InMemoryDOMDataStoreConfigProperties.create( props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(), props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(), @@ -67,10 +67,11 @@ public class DistributedOperationalDataStoreProviderModule extends .shardIsolatedLeaderCheckIntervalInMillis( props.getShardIsolatedLeaderCheckIntervalInMillis().getValue()) .shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue()) + .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue()) .build(); - return DistributedDataStoreFactory.createInstance("operational", - getOperationalSchemaServiceDependency(), datastoreContext, bundleContext); + return DistributedDataStoreFactory.createInstance(getOperationalSchemaServiceDependency(), + datastoreContext, bundleContext); } public void setBundleContext(BundleContext bundleContext) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang index 46cd50d0c1..e2ee7373d0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang @@ -180,6 +180,14 @@ module distributed-datastore-provider { description "The interval at which the leader of the shard will check if its majority followers are active and term itself as isolated"; } + + leaf tx-creation-initial-rate-limit { + default 100; + type non-zero-uint32-type; + description "The initial number of transactions per second that are allowed before the data store + should begin applying back pressure. This number is only used as an initial guidance, + subsequently the datastore measures the latency for a commit and auto-adjusts the rate limit"; + } } // Augments the 'configuration' choice node under modules/module. diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java new file mode 100644 index 0000000000..3e89823718 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java @@ -0,0 +1,37 @@ +package org.opendaylight.controller.cluster.datastore; + +import static org.junit.Assert.assertEquals; +import org.junit.Before; +import org.junit.Test; + +public class DatastoreContextTest { + + private DatastoreContext.Builder builder; + + @Before + public void setUp(){ + builder = new DatastoreContext.Builder(); + } + + @Test + public void testDefaults(){ + DatastoreContext build = builder.build(); + + assertEquals(DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT , build.getShardTransactionIdleTimeout()); + assertEquals(DatastoreContext.DEFAULT_OPERATION_TIMEOUT_IN_SECONDS, build.getOperationTimeoutInSeconds()); + assertEquals(DatastoreContext.DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS, build.getShardTransactionCommitTimeoutInSeconds()); + assertEquals(DatastoreContext.DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE, build.getShardRaftConfig().getJournalRecoveryLogBatchSize()); + assertEquals(DatastoreContext.DEFAULT_SNAPSHOT_BATCH_COUNT, build.getShardRaftConfig().getSnapshotBatchCount()); + assertEquals(DatastoreContext.DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS, build.getShardRaftConfig().getHeartBeatInterval().length()); + assertEquals(DatastoreContext.DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY, build.getShardTransactionCommitQueueCapacity()); + assertEquals(DatastoreContext.DEFAULT_SHARD_INITIALIZATION_TIMEOUT, build.getShardInitializationTimeout()); + assertEquals(DatastoreContext.DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT, build.getShardLeaderElectionTimeout()); + assertEquals(DatastoreContext.DEFAULT_PERSISTENT, build.isPersistent()); + assertEquals(DatastoreContext.DEFAULT_CONFIGURATION_READER, build.getConfigurationReader()); + assertEquals(DatastoreContext.DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS, build.getShardRaftConfig().getIsolatedCheckInterval().length()); + assertEquals(DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE, build.getShardRaftConfig().getSnapshotDataThresholdPercentage()); + assertEquals(DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR, build.getShardRaftConfig().getElectionTimeoutFactor()); + assertEquals(DatastoreContext.DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT, build.getTransactionCreationInitialRateLimit()); + } + +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 9f5aded352..1ad2be7af1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -790,8 +790,11 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf"); ShardStrategyFactory.setConfiguration(config); + datastoreContextBuilder.dataStoreType(typeName); + DatastoreContext datastoreContext = datastoreContextBuilder.build(); - DistributedDataStore dataStore = new DistributedDataStore(getSystem(), typeName, cluster, + + DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, datastoreContext); SchemaContext schemaContext = SchemaContextHelper.full(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java new file mode 100644 index 0000000000..66fa876277 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java @@ -0,0 +1,60 @@ +package org.opendaylight.controller.cluster.datastore; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; + +public class DistributedDataStoreTest extends AbstractActorTest { + + private SchemaContext schemaContext; + + @Mock + private ActorContext actorContext; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + + schemaContext = TestModel.createTestContext(); + + doReturn(schemaContext).when(actorContext).getSchemaContext(); + } + + @Test + public void testRateLimitingUsedInReadWriteTxCreation(){ + DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext); + + distributedDataStore.newReadWriteTransaction(); + + verify(actorContext, times(1)).acquireTxCreationPermit(); + } + + @Test + public void testRateLimitingUsedInWriteOnlyTxCreation(){ + DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext); + + distributedDataStore.newWriteOnlyTransaction(); + + verify(actorContext, times(1)).acquireTxCreationPermit(); + } + + + @Test + public void testRateLimitingNotUsedInReadOnlyTxCreation(){ + DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext); + + distributedDataStore.newReadOnlyTransaction(); + distributedDataStore.newReadOnlyTransaction(); + distributedDataStore.newReadOnlyTransaction(); + + verify(actorContext, times(0)).acquireTxCreationPermit(); + } + +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 8c56efd413..596761ddc8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -1,5 +1,10 @@ package org.opendaylight.controller.cluster.datastore; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import akka.actor.ActorRef; import akka.actor.Props; import akka.japi.Creator; @@ -11,6 +16,13 @@ import akka.util.Timeout; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; +import java.net.URI; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -35,20 +47,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; -import java.net.URI; -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public class ShardManagerTest extends AbstractActorTest { private static int ID_COUNTER = 1; @@ -73,8 +71,10 @@ public class ShardManagerTest extends AbstractActorTest { } private Props newShardMgrProps() { - return ShardManager.props(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(), - DatastoreContext.newBuilder().build()); + + DatastoreContext.Builder builder = DatastoreContext.newBuilder(); + builder.dataStoreType(shardMrgIDSuffix); + return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), builder.build()); } @Test @@ -351,10 +351,8 @@ public class ShardManagerTest extends AbstractActorTest { public void testRecoveryApplicable(){ new JavaTestKit(getSystem()) { { - final Props persistentProps = ShardManager.props(shardMrgIDSuffix, - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build()); + final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), + DatastoreContext.newBuilder().persistent(true).dataStoreType(shardMrgIDSuffix).build()); final TestActorRef persistentShardManager = TestActorRef.create(getSystem(), persistentProps); @@ -362,10 +360,8 @@ public class ShardManagerTest extends AbstractActorTest { assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable()); - final Props nonPersistentProps = ShardManager.props(shardMrgIDSuffix, - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(false).build()); + final Props nonPersistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), + DatastoreContext.newBuilder().persistent(false).dataStoreType(shardMrgIDSuffix).build()); final TestActorRef nonPersistentShardManager = TestActorRef.create(getSystem(), nonPersistentProps); @@ -386,7 +382,8 @@ public class ShardManagerTest extends AbstractActorTest { private static final long serialVersionUID = 1L; @Override public ShardManager create() throws Exception { - return new ShardManager(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build()) { + return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), + DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build()) { @Override protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) { DataPersistenceProviderMonitor dataPersistenceProviderMonitor @@ -426,8 +423,8 @@ public class ShardManagerTest extends AbstractActorTest { private final CountDownLatch recoveryComplete = new CountDownLatch(1); TestShardManager(String shardMrgIDSuffix) { - super(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(), - DatastoreContext.newBuilder().build()); + super(new MockClusterWrapper(), new MockConfiguration(), + DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build()); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java index 75c93dd5d2..d2396e0524 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java @@ -3,14 +3,20 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import akka.actor.ActorPath; import akka.actor.ActorSelection; import akka.actor.Props; import akka.dispatch.Futures; +import akka.util.Timeout; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.Timer; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import java.util.List; @@ -43,11 +49,30 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { @Mock private ActorContext actorContext; + @Mock + private DatastoreContext datastoreContext; + + @Mock + private Timer commitTimer; + + @Mock + private Timer.Context commitTimerContext; + + @Mock + private Snapshot commitSnapshot; + @Before public void setUp() { MockitoAnnotations.initMocks(this); doReturn(getSystem()).when(actorContext).getActorSystem(); + doReturn(datastoreContext).when(actorContext).getDatastoreContext(); + doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds(); + doReturn(commitTimer).when(actorContext).getOperationTimer("commit"); + doReturn(commitTimerContext).when(commitTimer).time(); + doReturn(commitSnapshot).when(commitTimer).getSnapshot(); + doReturn(TimeUnit.MILLISECONDS.toNanos(2000) * 1.0).when(commitSnapshot).get98thPercentile(); + doReturn(10.0).when(actorContext).getTxCreationLimit(); } private Future newCohort() { @@ -86,12 +111,12 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { } stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class), - isA(requestType)); + isA(requestType), any(Timeout.class)); } private void verifyCohortInvocations(int nCohorts, Class requestType) { verify(actorContext, times(nCohorts)).executeOperationAsync( - any(ActorSelection.class), isA(requestType)); + any(ActorSelection.class), isA(requestType), any(Timeout.class)); } private void propagateExecutionExceptionCause(ListenableFuture future) throws Throwable { @@ -276,8 +301,11 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { try { propagateExecutionExceptionCause(proxy.commit()); } finally { + + verify(actorContext, never()).setTxCreationLimit(anyLong()); verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS); } + } @Test @@ -294,11 +322,30 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(), new CommitTransactionReply()); + assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15); + proxy.canCommit().get(5, TimeUnit.SECONDS); proxy.preCommit().get(5, TimeUnit.SECONDS); proxy.commit().get(5, TimeUnit.SECONDS); verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS); verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS); + + // Verify that the creation limit was changed to 0.5 (based on setup) + verify(actorContext, timeout(5000)).setTxCreationLimit(0.5); + } + + @Test + public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception { + + ThreePhaseCommitCohortProxy proxy = setupProxy(0); + + assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15); + + proxy.canCommit().get(5, TimeUnit.SECONDS); + proxy.preCommit().get(5, TimeUnit.SECONDS); + proxy.commit().get(5, TimeUnit.SECONDS); + + verify(actorContext, never()).setTxCreationLimit(anyLong()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java index dd37371a45..23c3a82a38 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java @@ -11,12 +11,15 @@ package org.opendaylight.controller.cluster.datastore; import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.MockActorContext; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; @@ -29,10 +32,17 @@ public class TransactionChainProxyTest extends AbstractActorTest{ ActorContext actorContext = null; SchemaContext schemaContext = mock(SchemaContext.class); + @Mock + ActorContext mockActorContext; + @Before public void setUp() { + MockitoAnnotations.initMocks(this); + actorContext = new MockActorContext(getSystem()); actorContext.setSchemaContext(schemaContext); + + doReturn(schemaContext).when(mockActorContext).getSchemaContext(); } @SuppressWarnings("resource") @@ -76,4 +86,32 @@ public class TransactionChainProxyTest extends AbstractActorTest{ Assert.assertNotEquals(one.getTransactionChainId(), two.getTransactionChainId()); } + + @Test + public void testRateLimitingUsedInReadWriteTxCreation(){ + TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext); + + txChainProxy.newReadWriteTransaction(); + + verify(mockActorContext, times(1)).acquireTxCreationPermit(); + } + + @Test + public void testRateLimitingUsedInWriteOnlyTxCreation(){ + TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext); + + txChainProxy.newWriteOnlyTransaction(); + + verify(mockActorContext, times(1)).acquireTxCreationPermit(); + } + + + @Test + public void testRateLimitingNotUsedInReadOnlyTxCreation(){ + TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext); + + txChainProxy.newReadOnlyTransaction(); + + verify(mockActorContext, times(0)).acquireTxCreationPermit(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index e4ab969f5c..eae46da2ee 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -2,6 +2,7 @@ package org.opendaylight.controller.cluster.datastore.utils; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import akka.actor.ActorRef; import akka.actor.ActorSelection; @@ -12,10 +13,12 @@ import akka.japi.Creator; import akka.testkit.JavaTestKit; import com.google.common.base.Optional; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang.time.StopWatch; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; +import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; @@ -265,4 +268,35 @@ public class ActorContextTest extends AbstractActorTest{ assertEquals(expected, actual); } + @Test + public void testRateLimiting(){ + DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); + + doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); + doReturn("config").when(mockDataStoreContext).getDataStoreType(); + + ActorContext actorContext = + new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), + mock(Configuration.class), mockDataStoreContext); + + // Check that the initial value is being picked up from DataStoreContext + assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15); + + actorContext.setTxCreationLimit(1.0); + + assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15); + + + StopWatch watch = new StopWatch(); + + watch.start(); + + actorContext.acquireTxCreationPermit(); + actorContext.acquireTxCreationPermit(); + actorContext.acquireTxCreationPermit(); + + watch.stop(); + + assertTrue("did not take as much time as expected", watch.getTime() > 1000); + } } diff --git a/opendaylight/md-sal/sal-netconf-connector/pom.xml b/opendaylight/md-sal/sal-netconf-connector/pom.xml index c8836d1b88..add889fa3e 100644 --- a/opendaylight/md-sal/sal-netconf-connector/pom.xml +++ b/opendaylight/md-sal/sal-netconf-connector/pom.xml @@ -61,6 +61,10 @@ org.opendaylight.controller.model model-inventory + + org.opendaylight.yangtools.model + ietf-topology + org.opendaylight.controller sal-broker-impl diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/connector/netconf/NetconfConnectorModule.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/connector/netconf/NetconfConnectorModule.java index 97e294016d..460e072d9a 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/connector/netconf/NetconfConnectorModule.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/connector/netconf/NetconfConnectorModule.java @@ -26,7 +26,7 @@ import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler; import org.opendaylight.controller.sal.connect.netconf.NetconfDevice; import org.opendaylight.controller.sal.connect.netconf.NetconfStateSchemas; import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator; -import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceSalFacade; import org.opendaylight.controller.sal.connect.netconf.schema.mapping.NetconfMessageTransformer; import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; @@ -50,7 +50,7 @@ public final class NetconfConnectorModule extends org.opendaylight.controller.co private static final Logger logger = LoggerFactory.getLogger(NetconfConnectorModule.class); private BundleContext bundleContext; - private Optional userCapabilities; + private Optional userCapabilities; private SchemaSourceRegistry schemaRegistry; private SchemaContextFactory schemaContextFactory; @@ -97,14 +97,14 @@ public final class NetconfConnectorModule extends org.opendaylight.controller.co @Override public java.lang.AutoCloseable createInstance() { - final RemoteDeviceId id = new RemoteDeviceId(getIdentifier()); + final RemoteDeviceId id = new RemoteDeviceId(getIdentifier(), getSocketAddress()); final ExecutorService globalProcessingExecutor = getProcessingExecutorDependency().getExecutor(); final Broker domBroker = getDomRegistryDependency(); final BindingAwareBroker bindingBroker = getBindingRegistryDependency(); - final RemoteDeviceHandler salFacade + final RemoteDeviceHandler salFacade = new NetconfDeviceSalFacade(id, domBroker, bindingBroker, bundleContext, globalProcessingExecutor); final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = @@ -124,7 +124,7 @@ public final class NetconfConnectorModule extends org.opendaylight.controller.co return new MyAutoCloseable(listener, salFacade); } - private Optional getUserCapabilities() { + private Optional getUserCapabilities() { if(getYangModuleCapabilities() == null) { return Optional.absent(); } @@ -134,7 +134,7 @@ public final class NetconfConnectorModule extends org.opendaylight.controller.co return Optional.absent(); } - final NetconfSessionCapabilities parsedOverrideCapabilities = NetconfSessionCapabilities.fromStrings(capabilities); + final NetconfSessionPreferences parsedOverrideCapabilities = NetconfSessionPreferences.fromStrings(capabilities); JmxAttributeValidationException.checkCondition( parsedOverrideCapabilities.getNonModuleCaps().isEmpty(), "Capabilities to override can only contain module based capabilities, non-module capabilities will be retrieved from the device," + @@ -170,11 +170,11 @@ public final class NetconfConnectorModule extends org.opendaylight.controller.co } private static final class MyAutoCloseable implements AutoCloseable { - private final RemoteDeviceHandler salFacade; + private final RemoteDeviceHandler salFacade; private final NetconfDeviceCommunicator listener; public MyAutoCloseable(final NetconfDeviceCommunicator listener, - final RemoteDeviceHandler salFacade) { + final RemoteDeviceHandler salFacade) { this.listener = listener; this.salFacade = salFacade; } diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/RemoteDevice.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/RemoteDevice.java index e0d24331a7..9423dbf1d2 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/RemoteDevice.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/RemoteDevice.java @@ -16,5 +16,7 @@ public interface RemoteDevice { void onRemoteSessionDown(); + void onRemoteSessionFailed(Throwable throwable); + void onNotification(M notification); } diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/RemoteDeviceHandler.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/RemoteDeviceHandler.java index 269c4af82f..c5a0ae2544 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/RemoteDeviceHandler.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/RemoteDeviceHandler.java @@ -18,6 +18,8 @@ public interface RemoteDeviceHandler extends AutoCloseable { void onDeviceDisconnected(); + void onDeviceFailed(Throwable throwable); + void onNotification(CompositeNode domNotification); void close(); diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java index 31779a7817..39340fa166 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java @@ -20,6 +20,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import java.util.Collection; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Set; @@ -30,10 +31,12 @@ import org.opendaylight.controller.sal.connect.api.MessageTransformer; import org.opendaylight.controller.sal.connect.api.RemoteDevice; import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator; import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler; -import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCapabilities; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc; import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaYangSourceProvider; import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability.FailureReason; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException; @@ -51,7 +54,7 @@ import org.slf4j.LoggerFactory; /** * This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade */ -public final class NetconfDevice implements RemoteDevice { +public final class NetconfDevice implements RemoteDevice { private static final Logger logger = LoggerFactory.getLogger(NetconfDevice.class); @@ -65,7 +68,7 @@ public final class NetconfDevice implements RemoteDevice salFacade; + private final RemoteDeviceHandler salFacade; private final ListeningExecutorService processingExecutor; private final SchemaSourceRegistry schemaRegistry; private final MessageTransformer messageTransformer; @@ -73,7 +76,7 @@ public final class NetconfDevice implements RemoteDevice> sourceRegistrations = Lists.newArrayList(); - public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler salFacade, + public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler salFacade, final ExecutorService globalProcessingExecutor, final MessageTransformer messageTransformer) { this.id = id; this.schemaRegistry = schemaResourcesDTO.getSchemaRegistry(); @@ -86,7 +89,7 @@ public final class NetconfDevice implements RemoteDevice listener) { // SchemaContext setup has to be performed in a dedicated thread since // we are in a netty thread in this method @@ -119,9 +122,10 @@ public final class NetconfDevice implements RemoteDevice { private final NetconfDeviceRpc deviceRpc; - private final NetconfSessionCapabilities remoteSessionCapabilities; + private final NetconfSessionPreferences remoteSessionCapabilities; private final RemoteDeviceId id; private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver; - public DeviceSourcesResolver(final NetconfDeviceRpc deviceRpc, final NetconfSessionCapabilities remoteSessionCapabilities, final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) { + public DeviceSourcesResolver(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) { this.deviceRpc = deviceRpc; this.remoteSessionCapabilities = remoteSessionCapabilities; this.id = id; @@ -287,15 +296,17 @@ public final class NetconfDevice implements RemoteDevice listener; + private NetconfDeviceCapabilities capabilities; - public RecursiveSchemaSetup(final DeviceSources deviceSources, final NetconfSessionCapabilities remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc, final RemoteDeviceCommunicator listener) { + public RecursiveSchemaSetup(final DeviceSources deviceSources, final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc, final RemoteDeviceCommunicator listener) { this.deviceSources = deviceSources; this.remoteSessionCapabilities = remoteSessionCapabilities; this.deviceRpc = deviceRpc; this.listener = listener; + this.capabilities = remoteSessionCapabilities.getNetconfDeviceCapabilities(); } @Override @@ -306,6 +317,7 @@ public final class NetconfDevice implements RemoteDevice requiredSources) { logger.trace("{}: Trying to build schema context from {}", id, requiredSources); @@ -322,6 +334,9 @@ public final class NetconfDevice implements RemoteDevice filteredQNames = Sets.difference(remoteSessionCapabilities.getModuleBasedCaps(), capabilities.getUnresolvedCapabilites().keySet()); + capabilities.addCapabilities(filteredQNames); + capabilities.addNonModuleBasedCapabilities(remoteSessionCapabilities.getNonModuleCaps()); handleSalInitializationSuccess(result, remoteSessionCapabilities, deviceRpc); } @@ -331,12 +346,15 @@ public final class NetconfDevice implements RemoteDevice unresolvedSources = resolutionException.getUnsatisfiedImports().keySet(); + capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(unresolvedSources), FailureReason.UnableToResolve); logger.warn("{}: Unable to build schema context, unsatisfied imports {}, will reattempt with resolved only", id, resolutionException.getUnsatisfiedImports()); setUpSchema(resolutionException.getResolvedSources()); // unknown error, fail @@ -355,5 +373,29 @@ public final class NetconfDevice implements RemoteDevice getQNameFromSourceIdentifiers(Collection identifiers) { + Collection qNames = new HashSet<>(); + for (SourceIdentifier source : identifiers) { + Optional qname = getQNameFromSourceIdentifier(source); + if (qname.isPresent()) { + qNames.add(qname.get()); + } + } + if (qNames.isEmpty()) { + logger.debug("Unable to map any source identfiers to a capability reported by device : " + identifiers); + } + return qNames; + } + + private Optional getQNameFromSourceIdentifier(SourceIdentifier identifier) { + for (QName qname : remoteSessionCapabilities.getModuleBasedCaps()) { + if (qname.getLocalName().equals(identifier.getName()) + && qname.getFormattedRevision().equals(identifier.getRevision())) { + return Optional.of(qname); + } + } + throw new IllegalArgumentException("Unable to map identifier to a devices reported capability: " + identifier); + } } } diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfStateSchemas.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfStateSchemas.java index d758073a8e..68c1a5c6a8 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfStateSchemas.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfStateSchemas.java @@ -11,7 +11,7 @@ import java.net.URI; import java.util.Collections; import java.util.Set; import java.util.concurrent.ExecutionException; -import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc; import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil; import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; @@ -40,7 +40,7 @@ public final class NetconfStateSchemas { * Factory for NetconfStateSchemas */ public interface NetconfStateSchemasResolver { - NetconfStateSchemas resolve(final NetconfDeviceRpc deviceRpc, final NetconfSessionCapabilities remoteSessionCapabilities, final RemoteDeviceId id); + NetconfStateSchemas resolve(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id); } /** @@ -49,7 +49,7 @@ public final class NetconfStateSchemas { public static final class NetconfStateSchemasResolverImpl implements NetconfStateSchemasResolver { @Override - public NetconfStateSchemas resolve(final NetconfDeviceRpc deviceRpc, final NetconfSessionCapabilities remoteSessionCapabilities, final RemoteDeviceId id) { + public NetconfStateSchemas resolve(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id) { return NetconfStateSchemas.create(deviceRpc, remoteSessionCapabilities, id); } } @@ -91,7 +91,7 @@ public final class NetconfStateSchemas { /** * Issue get request to remote device and parse response to find all schemas under netconf-state/schemas */ - private static NetconfStateSchemas create(final NetconfDeviceRpc deviceRpc, final NetconfSessionCapabilities remoteSessionCapabilities, final RemoteDeviceId id) { + private static NetconfStateSchemas create(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id) { if(remoteSessionCapabilities.isMonitoringSupported() == false) { logger.warn("{}: Netconf monitoring not supported on device, cannot detect provided schemas"); return EMPTY; diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCapabilities.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCapabilities.java new file mode 100644 index 0000000000..8f30a5c63a --- /dev/null +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCapabilities.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.sal.connect.netconf.listener; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability.FailureReason; +import org.opendaylight.yangtools.yang.common.QName; + +public final class NetconfDeviceCapabilities { + private final Map unresolvedCapabilites; + private final Set resolvedCapabilities; + + private final Set nonModuleBasedCapabilities; + + public NetconfDeviceCapabilities() { + this.unresolvedCapabilites = new HashMap<>(); + this.resolvedCapabilities = new HashSet<>(); + this.nonModuleBasedCapabilities = new HashSet<>(); + } + + public void addUnresolvedCapability(QName source, FailureReason reason) { + unresolvedCapabilites.put(source, reason); + } + + public void addUnresolvedCapabilities(Collection capabilities, FailureReason reason) { + for (QName s : capabilities) { + unresolvedCapabilites.put(s, reason); + } + } + + public void addCapabilities(Collection availableSchemas) { + resolvedCapabilities.addAll(availableSchemas); + } + + public void addNonModuleBasedCapabilities(Collection nonModuleCapabilities) { + this.nonModuleBasedCapabilities.addAll(nonModuleCapabilities); + } + + public Set getNonModuleBasedCapabilities() { + return nonModuleBasedCapabilities; + } + + public Map getUnresolvedCapabilites() { + return unresolvedCapabilites; + } + + public Set getResolvedCapabilities() { + return resolvedCapabilities; + } + +} diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicator.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicator.java index aadb911f45..556fc2f1d2 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicator.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicator.java @@ -14,6 +14,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.GenericFutureListener; import java.util.ArrayDeque; import java.util.Iterator; import java.util.List; @@ -46,8 +47,8 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, private static final Logger logger = LoggerFactory.getLogger(NetconfDeviceCommunicator.class); - private final RemoteDevice remoteDevice; - private final Optional overrideNetconfCapabilities; + private final RemoteDevice remoteDevice; + private final Optional overrideNetconfCapabilities; private final RemoteDeviceId id; private final Lock sessionLock = new ReentrantLock(); @@ -56,18 +57,18 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, private NetconfClientSession session; private Future initFuture; - public NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice remoteDevice, - final NetconfSessionCapabilities netconfSessionCapabilities) { - this(id, remoteDevice, Optional.of(netconfSessionCapabilities)); + public NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice remoteDevice, + final NetconfSessionPreferences netconfSessionPreferences) { + this(id, remoteDevice, Optional.of(netconfSessionPreferences)); } public NetconfDeviceCommunicator(final RemoteDeviceId id, - final RemoteDevice remoteDevice) { - this(id, remoteDevice, Optional.absent()); + final RemoteDevice remoteDevice) { + this(id, remoteDevice, Optional.absent()); } - private NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice remoteDevice, - final Optional overrideNetconfCapabilities) { + private NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice remoteDevice, + final Optional overrideNetconfCapabilities) { this.id = id; this.remoteDevice = remoteDevice; this.overrideNetconfCapabilities = overrideNetconfCapabilities; @@ -80,16 +81,16 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, logger.debug("{}: Session established", id); this.session = session; - NetconfSessionCapabilities netconfSessionCapabilities = - NetconfSessionCapabilities.fromNetconfSession(session); - logger.trace("{}: Session advertised capabilities: {}", id, netconfSessionCapabilities); + NetconfSessionPreferences netconfSessionPreferences = + NetconfSessionPreferences.fromNetconfSession(session); + logger.trace("{}: Session advertised capabilities: {}", id, netconfSessionPreferences); if(overrideNetconfCapabilities.isPresent()) { - netconfSessionCapabilities = netconfSessionCapabilities.replaceModuleCaps(overrideNetconfCapabilities.get()); - logger.debug("{}: Session capabilities overridden, capabilities that will be used: {}", id, netconfSessionCapabilities); + netconfSessionPreferences = netconfSessionPreferences.replaceModuleCaps(overrideNetconfCapabilities.get()); + logger.debug("{}: Session capabilities overridden, capabilities that will be used: {}", id, netconfSessionPreferences); } - remoteDevice.onRemoteSessionUp(netconfSessionCapabilities, this); + remoteDevice.onRemoteSessionUp(netconfSessionPreferences, this); } finally { sessionLock.unlock(); @@ -103,6 +104,17 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, } else { initFuture = dispatch.createClient(config); } + + initFuture.addListener(new GenericFutureListener>(){ + + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + logger.debug("{}: Connection failed", id, future.cause()); + NetconfDeviceCommunicator.this.remoteDevice.onRemoteSessionFailed(future.cause()); + } + } + }); } private void tearDown( String reason ) { diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionCapabilities.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionPreferences.java similarity index 88% rename from opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionCapabilities.java rename to opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionPreferences.java index d5b3778b4f..572885bcef 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionCapabilities.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionPreferences.java @@ -18,7 +18,7 @@ import org.opendaylight.yangtools.yang.common.QName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public final class NetconfSessionCapabilities { +public final class NetconfSessionPreferences { private static final class ParameterMatcher { private final Predicate predicate; @@ -45,7 +45,7 @@ public final class NetconfSessionCapabilities { } } - private static final Logger LOG = LoggerFactory.getLogger(NetconfSessionCapabilities.class); + private static final Logger LOG = LoggerFactory.getLogger(NetconfSessionPreferences.class); private static final ParameterMatcher MODULE_PARAM = new ParameterMatcher("module="); private static final ParameterMatcher REVISION_PARAM = new ParameterMatcher("revision="); private static final ParameterMatcher BROKEN_REVISON_PARAM = new ParameterMatcher("amp;revision="); @@ -60,7 +60,7 @@ public final class NetconfSessionCapabilities { private final Set moduleBasedCaps; private final Set nonModuleCaps; - private NetconfSessionCapabilities(final Set nonModuleCaps, final Set moduleBasedCaps) { + private NetconfSessionPreferences(final Set nonModuleCaps, final Set moduleBasedCaps) { this.nonModuleCaps = Preconditions.checkNotNull(nonModuleCaps); this.moduleBasedCaps = Preconditions.checkNotNull(moduleBasedCaps); } @@ -110,17 +110,17 @@ public final class NetconfSessionCapabilities { || containsNonModuleCapability(NetconfMessageTransformUtil.IETF_NETCONF_MONITORING.getNamespace().toString()); } - public NetconfSessionCapabilities replaceModuleCaps(final NetconfSessionCapabilities netconfSessionModuleCapabilities) { + public NetconfSessionPreferences replaceModuleCaps(final NetconfSessionPreferences netconfSessionModuleCapabilities) { final Set moduleBasedCaps = Sets.newHashSet(netconfSessionModuleCapabilities.getModuleBasedCaps()); // Preserve monitoring module, since it indicates support for ietf-netconf-monitoring if(containsModuleCapability(NetconfMessageTransformUtil.IETF_NETCONF_MONITORING)) { moduleBasedCaps.add(NetconfMessageTransformUtil.IETF_NETCONF_MONITORING); } - return new NetconfSessionCapabilities(getNonModuleCaps(), moduleBasedCaps); + return new NetconfSessionPreferences(getNonModuleCaps(), moduleBasedCaps); } - public static NetconfSessionCapabilities fromNetconfSession(final NetconfClientSession session) { + public static NetconfSessionPreferences fromNetconfSession(final NetconfClientSession session) { return fromStrings(session.getServerCapabilities()); } @@ -132,7 +132,7 @@ public final class NetconfSessionCapabilities { return QName.cachedReference(QName.create(URI.create(namespace), null, moduleName).withoutRevision()); } - public static NetconfSessionCapabilities fromStrings(final Collection capabilities) { + public static NetconfSessionPreferences fromStrings(final Collection capabilities) { final Set moduleBasedCaps = new HashSet<>(); final Set nonModuleCaps = Sets.newHashSet(capabilities); @@ -176,7 +176,7 @@ public final class NetconfSessionCapabilities { addModuleQName(moduleBasedCaps, nonModuleCaps, capability, cachedQName(namespace, moduleName)); } - return new NetconfSessionCapabilities(ImmutableSet.copyOf(nonModuleCaps), ImmutableSet.copyOf(moduleBasedCaps)); + return new NetconfSessionPreferences(ImmutableSet.copyOf(nonModuleCaps), ImmutableSet.copyOf(moduleBasedCaps)); } @@ -184,4 +184,12 @@ public final class NetconfSessionCapabilities { moduleBasedCaps.add(qName); nonModuleCaps.remove(capability); } + + private NetconfDeviceCapabilities capabilities = new NetconfDeviceCapabilities(); + + public NetconfDeviceCapabilities getNetconfDeviceCapabilities() { + return capabilities; + } + + } diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceDataBroker.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceDataBroker.java index aa22e877a4..87ca11de87 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceDataBroker.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceDataBroker.java @@ -17,7 +17,7 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; -import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.controller.sal.connect.netconf.sal.tx.ReadOnlyTx; import org.opendaylight.controller.sal.connect.netconf.sal.tx.ReadWriteTx; import org.opendaylight.controller.sal.connect.netconf.sal.tx.WriteCandidateTx; @@ -33,10 +33,10 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; final class NetconfDeviceDataBroker implements DOMDataBroker { private final RemoteDeviceId id; private final NetconfBaseOps netconfOps; - private final NetconfSessionCapabilities netconfSessionPreferences; + private final NetconfSessionPreferences netconfSessionPreferences; private final DataNormalizer normalizer; - public NetconfDeviceDataBroker(final RemoteDeviceId id, final RpcImplementation rpc, final SchemaContext schemaContext, final NetconfSessionCapabilities netconfSessionPreferences) { + public NetconfDeviceDataBroker(final RemoteDeviceId id, final RpcImplementation rpc, final SchemaContext schemaContext, final NetconfSessionPreferences netconfSessionPreferences) { this.id = id; this.netconfOps = new NetconfBaseOps(rpc); this.netconfSessionPreferences = netconfSessionPreferences; diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceDatastoreAdapter.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceDatastoreAdapter.java index fc69a7e253..3715969b2b 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceDatastoreAdapter.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceDatastoreAdapter.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; * * All data changes are submitted to an ExecutorService to avoid Thread blocking while sal is waiting for schema. */ +@Deprecated final class NetconfDeviceDatastoreAdapter implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(NetconfDeviceDatastoreAdapter.class); diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceSalFacade.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceSalFacade.java index bdeb129d55..db8a238242 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceSalFacade.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceSalFacade.java @@ -16,7 +16,8 @@ import java.util.concurrent.ExecutorService; import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler; -import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCapabilities; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.controller.sal.core.api.RpcImplementation; @@ -36,7 +37,7 @@ import org.osgi.framework.BundleContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public final class NetconfDeviceSalFacade implements AutoCloseable, RemoteDeviceHandler { +public final class NetconfDeviceSalFacade implements AutoCloseable, RemoteDeviceHandler { private static final Logger logger= LoggerFactory.getLogger(NetconfDeviceSalFacade.class); @@ -63,7 +64,7 @@ public final class NetconfDeviceSalFacade implements AutoCloseable, RemoteDevice @Override public synchronized void onDeviceConnected(final SchemaContext schemaContext, - final NetconfSessionCapabilities netconfSessionPreferences, final RpcImplementation deviceRpc) { + final NetconfSessionPreferences netconfSessionPreferences, final RpcImplementation deviceRpc) { // TODO move SchemaAwareRpcBroker from sal-broker-impl, now we have depend on the whole sal-broker-impl final RpcProvisionRegistry rpcRegistry = new SchemaAwareRpcBroker(id.getPath().toString(), new SchemaContextProvider() { @@ -93,12 +94,23 @@ public final class NetconfDeviceSalFacade implements AutoCloseable, RemoteDevice salProvider.getMountInstance().onDeviceConnected(schemaContext, domBroker, rpcRegistry, notificationService); salProvider.getDatastoreAdapter().updateDeviceState(true, netconfSessionPreferences.getModuleBasedCaps()); + salProvider.getMountInstance().onTopologyDeviceConnected(schemaContext, domBroker, rpcRegistry, notificationService); + salProvider.getTopologyDatastoreAdapter().updateDeviceData(true, netconfSessionPreferences.getNetconfDeviceCapabilities()); } @Override public synchronized void onDeviceDisconnected() { salProvider.getDatastoreAdapter().updateDeviceState(false, Collections.emptySet()); + salProvider.getTopologyDatastoreAdapter().updateDeviceData(false, new NetconfDeviceCapabilities()); salProvider.getMountInstance().onDeviceDisconnected(); + salProvider.getMountInstance().onTopologyDeviceDisconnected(); + } + + @Override + public void onDeviceFailed(Throwable throwable) { + salProvider.getTopologyDatastoreAdapter().setDeviceAsFailed(throwable); + salProvider.getMountInstance().onDeviceDisconnected(); + salProvider.getMountInstance().onTopologyDeviceDisconnected(); } private void registerRpcsToSal(final SchemaContext schemaContext, final RpcProvisionRegistry rpcRegistry, final RpcImplementation deviceRpc) { diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceSalProvider.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceSalProvider.java index 171f2f4b0b..dfae165d30 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceSalProvider.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceSalProvider.java @@ -37,6 +37,8 @@ final class NetconfDeviceSalProvider implements AutoCloseable, Provider, Binding private volatile NetconfDeviceDatastoreAdapter datastoreAdapter; private MountInstance mountInstance; + private volatile NetconfDeviceTopologyAdapter topologyDatastoreAdapter; + public NetconfDeviceSalProvider(final RemoteDeviceId deviceId, final ExecutorService executor) { this.id = deviceId; this.executor = executor; @@ -54,6 +56,12 @@ final class NetconfDeviceSalProvider implements AutoCloseable, Provider, Binding return datastoreAdapter; } + public NetconfDeviceTopologyAdapter getTopologyDatastoreAdapter() { + Preconditions.checkState(topologyDatastoreAdapter != null, + "%s: Sal provider %s was not initialized by sal. Cannot get topology datastore adapter", id); + return topologyDatastoreAdapter; + } + @Override public void onSessionInitiated(final Broker.ProviderSession session) { logger.debug("{}: (BI)Session with sal established {}", id, session); @@ -75,6 +83,8 @@ final class NetconfDeviceSalProvider implements AutoCloseable, Provider, Binding final DataBroker dataBroker = session.getSALService(DataBroker.class); datastoreAdapter = new NetconfDeviceDatastoreAdapter(id, dataBroker); + + topologyDatastoreAdapter = new NetconfDeviceTopologyAdapter(id, dataBroker); } public void close() throws Exception { @@ -90,11 +100,14 @@ final class NetconfDeviceSalProvider implements AutoCloseable, Provider, Binding private ObjectRegistration registration; private NotificationPublishService notificationSerivce; + private ObjectRegistration topologyRegistration; + MountInstance(final DOMMountPointService mountService, final RemoteDeviceId id) { this.mountService = Preconditions.checkNotNull(mountService); this.id = Preconditions.checkNotNull(id); } + @Deprecated synchronized void onDeviceConnected(final SchemaContext initialCtx, final DOMDataBroker broker, final RpcProvisionRegistry rpc, final NotificationPublishService notificationSerivce) { @@ -113,6 +126,7 @@ final class NetconfDeviceSalProvider implements AutoCloseable, Provider, Binding registration = mountBuilder.register(); } + @Deprecated synchronized void onDeviceDisconnected() { if(registration == null) { return; @@ -128,10 +142,44 @@ final class NetconfDeviceSalProvider implements AutoCloseable, Provider, Binding } } + synchronized void onTopologyDeviceConnected(final SchemaContext initialCtx, + final DOMDataBroker broker, final RpcProvisionRegistry rpc, + final NotificationPublishService notificationSerivce) { + + Preconditions.checkNotNull(mountService, "Closed"); + Preconditions.checkState(topologyRegistration == null, "Already initialized"); + + final DOMMountPointService.DOMMountPointBuilder mountBuilder = mountService.createMountPoint(id.getTopologyPath()); + mountBuilder.addInitialSchemaContext(initialCtx); + + mountBuilder.addService(DOMDataBroker.class, broker); + mountBuilder.addService(RpcProvisionRegistry.class, rpc); + this.notificationSerivce = notificationSerivce; + mountBuilder.addService(NotificationPublishService.class, notificationSerivce); + + topologyRegistration = mountBuilder.register(); + } + + synchronized void onTopologyDeviceDisconnected() { + if(topologyRegistration == null) { + return; + } + + try { + topologyRegistration.close(); + } catch (final Exception e) { + // Only log and ignore + logger.warn("Unable to unregister mount instance for {}. Ignoring exception", id.getTopologyPath(), e); + } finally { + topologyRegistration = null; + } + } + @Override synchronized public void close() throws Exception { if(registration != null) { onDeviceDisconnected(); + onTopologyDeviceDisconnected(); } mountService = null; } diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceTopologyAdapter.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceTopologyAdapter.java new file mode 100644 index 0000000000..83664e440f --- /dev/null +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceTopologyAdapter.java @@ -0,0 +1,219 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.sal.connect.netconf.sal; + +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCapabilities; +import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.PortNumber; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilitiesBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.UnavailableCapabilities; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.UnavailableCapabilitiesBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability.FailureReason; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapabilityBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; +import org.opendaylight.yangtools.yang.common.QName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class NetconfDeviceTopologyAdapter implements AutoCloseable { + + public static final Logger logger = LoggerFactory.getLogger(NetconfDeviceTopologyAdapter.class); + public static final Function, UnavailableCapability> UNAVAILABLE_CAPABILITY_TRANSFORMER = new Function, UnavailableCapability>() { + @Override + public UnavailableCapability apply(final Entry input) { + return new UnavailableCapabilityBuilder() + .setCapability(input.getKey().toString()) + .setFailureReason(input.getValue()).build(); + } + }; + public static final Function AVAILABLE_CAPABILITY_TRANSFORMER = new Function() { + @Override + public String apply(QName qName) { + return qName.toString(); + } + }; + + private final RemoteDeviceId id; + private final DataBroker dataService; + + private final InstanceIdentifier networkTopologyPath; + private final KeyedInstanceIdentifier topologyListPath; + private static final String UNKNOWN_REASON = "Unknown reason"; + + NetconfDeviceTopologyAdapter(final RemoteDeviceId id, final DataBroker dataService) { + this.id = id; + this.dataService = dataService; + + this.networkTopologyPath = InstanceIdentifier.builder(NetworkTopology.class).build(); + this.topologyListPath = networkTopologyPath.child(Topology.class, new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName()))); + + initDeviceData(); + } + + private void initDeviceData() { + final WriteTransaction writeTx = dataService.newWriteOnlyTransaction(); + + createNetworkTopologyIfNotPresent(writeTx); + + final InstanceIdentifier path = id.getTopologyBindingPath(); + NodeBuilder nodeBuilder = getNodeIdBuilder(id); + NetconfNodeBuilder netconfNodeBuilder = new NetconfNodeBuilder(); + netconfNodeBuilder.setConnectionStatus(ConnectionStatus.Connecting); + netconfNodeBuilder.setHost(id.getHost()); + netconfNodeBuilder.setPort(new PortNumber(id.getAddress().getPort())); + nodeBuilder.addAugmentation(NetconfNode.class, netconfNodeBuilder.build()); + Node node = nodeBuilder.build(); + + logger.trace("{}: Init device state transaction {} putting if absent operational data started.", id, writeTx.getIdentifier()); + writeTx.put(LogicalDatastoreType.OPERATIONAL, path, node); + logger.trace("{}: Init device state transaction {} putting operational data ended.", id, writeTx.getIdentifier()); + + logger.trace("{}: Init device state transaction {} putting if absent config data started.", id, writeTx.getIdentifier()); + writeTx.put(LogicalDatastoreType.CONFIGURATION, path, getNodeWithId(id)); + logger.trace("{}: Init device state transaction {} putting config data ended.", id, writeTx.getIdentifier()); + + commitTransaction(writeTx, "init"); + } + + public void updateDeviceData(boolean up, NetconfDeviceCapabilities capabilities) { + final Node data = buildDataForNetconfNode(up, capabilities); + + final WriteTransaction writeTx = dataService.newWriteOnlyTransaction(); + logger.trace("{}: Update device state transaction {} merging operational data started.", id, writeTx.getIdentifier()); + writeTx.put(LogicalDatastoreType.OPERATIONAL, id.getTopologyBindingPath(), data); + logger.trace("{}: Update device state transaction {} merging operational data ended.", id, writeTx.getIdentifier()); + + commitTransaction(writeTx, "update"); + } + + public void setDeviceAsFailed(Throwable throwable) { + String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON; + + final NetconfNode netconfNode = new NetconfNodeBuilder().setConnectionStatus(ConnectionStatus.UnableToConnect).setConnectedMessage(reason).build(); + final Node data = getNodeIdBuilder(id).addAugmentation(NetconfNode.class, netconfNode).build(); + + final WriteTransaction writeTx = dataService.newWriteOnlyTransaction(); + logger.trace("{}: Setting device state as failed {} putting operational data started.", id, writeTx.getIdentifier()); + writeTx.put(LogicalDatastoreType.OPERATIONAL, id.getTopologyBindingPath(), data); + logger.trace("{}: Setting device state as failed {} putting operational data ended.", id, writeTx.getIdentifier()); + + commitTransaction(writeTx, "update-failed-device"); + } + + private Node buildDataForNetconfNode(boolean up, NetconfDeviceCapabilities capabilities) { + List capabilityList = new ArrayList<>(); + capabilityList.addAll(capabilities.getNonModuleBasedCapabilities()); + capabilityList.addAll(FluentIterable.from(capabilities.getResolvedCapabilities()).transform(AVAILABLE_CAPABILITY_TRANSFORMER).toList()); + final AvailableCapabilitiesBuilder avCapabalitiesBuilder = new AvailableCapabilitiesBuilder(); + avCapabalitiesBuilder.setAvailableCapability(capabilityList); + + final UnavailableCapabilities unavailableCapabilities = + new UnavailableCapabilitiesBuilder().setUnavailableCapability(FluentIterable.from(capabilities.getUnresolvedCapabilites().entrySet()) + .transform(UNAVAILABLE_CAPABILITY_TRANSFORMER).toList()).build(); + + final NetconfNodeBuilder netconfNodeBuilder = new NetconfNodeBuilder() + .setHost(id.getHost()) + .setPort(new PortNumber(id.getAddress().getPort())) + .setConnectionStatus(up ? ConnectionStatus.Connected : ConnectionStatus.Connecting) + .setAvailableCapabilities(avCapabalitiesBuilder.build()) + .setUnavailableCapabilities(unavailableCapabilities); + + final NodeBuilder nodeBuilder = getNodeIdBuilder(id); + final Node node = nodeBuilder.addAugmentation(NetconfNode.class, netconfNodeBuilder.build()).build(); + + return node; + } + + public void removeDeviceConfiguration() { + final WriteTransaction writeTx = dataService.newWriteOnlyTransaction(); + + logger.trace("{}: Close device state transaction {} removing all data started.", id, writeTx.getIdentifier()); + writeTx.delete(LogicalDatastoreType.CONFIGURATION, id.getTopologyBindingPath()); + writeTx.delete(LogicalDatastoreType.OPERATIONAL, id.getTopologyBindingPath()); + logger.trace("{}: Close device state transaction {} removing all data ended.", id, writeTx.getIdentifier()); + + commitTransaction(writeTx, "close"); + } + + private void createNetworkTopologyIfNotPresent(final WriteTransaction writeTx) { + + final NetworkTopology networkTopology = new NetworkTopologyBuilder().build(); + logger.trace("{}: Merging {} container to ensure its presence", id, networkTopology.QNAME, writeTx.getIdentifier()); + writeTx.merge(LogicalDatastoreType.CONFIGURATION, networkTopologyPath, networkTopology); + writeTx.merge(LogicalDatastoreType.OPERATIONAL, networkTopologyPath, networkTopology); + + final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(TopologyNetconf.QNAME.getLocalName())).build(); + logger.trace("{}: Merging {} container to ensure its presence", id, topology.QNAME, writeTx.getIdentifier()); + writeTx.merge(LogicalDatastoreType.CONFIGURATION, topologyListPath, topology); + writeTx.merge(LogicalDatastoreType.OPERATIONAL, topologyListPath, topology); + } + + private void commitTransaction(final WriteTransaction transaction, final String txType) { + logger.trace("{}: Committing Transaction {}:{}", id, txType, transaction.getIdentifier()); + final CheckedFuture result = transaction.submit(); + + Futures.addCallback(result, new FutureCallback() { + @Override + public void onSuccess(final Void result) { + logger.trace("{}: Transaction({}) {} SUCCESSFUL", id, txType, transaction.getIdentifier()); + } + + @Override + public void onFailure(final Throwable t) { + logger.error("{}: Transaction({}) {} FAILED!", id, txType, transaction.getIdentifier(), t); + throw new IllegalStateException(id + " Transaction(" + txType + ") not committed correctly", t); + } + }); + + } + + private static Node getNodeWithId(final RemoteDeviceId id) { + final NodeBuilder builder = getNodeIdBuilder(id); + return builder.build(); + } + + private static NodeBuilder getNodeIdBuilder(final RemoteDeviceId id) { + final NodeBuilder nodeBuilder = new NodeBuilder(); + nodeBuilder.setKey(new NodeKey(new NodeId(id.getName()))); + return nodeBuilder; + } + + @Override + public void close() throws Exception { + removeDeviceConfiguration(); + } +} diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/AbstractWriteTx.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/AbstractWriteTx.java index 165d9c452d..435ef9915d 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/AbstractWriteTx.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/AbstractWriteTx.java @@ -14,7 +14,7 @@ import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.controller.netconf.api.NetconfDocumentedException; -import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.controller.sal.connect.netconf.util.NetconfBaseOps; import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; import org.opendaylight.yangtools.yang.common.RpcResult; @@ -27,11 +27,11 @@ public abstract class AbstractWriteTx implements DOMDataWriteTransaction { protected final RemoteDeviceId id; protected final NetconfBaseOps netOps; protected final DataNormalizer normalizer; - protected final NetconfSessionCapabilities netconfSessionPreferences; + protected final NetconfSessionPreferences netconfSessionPreferences; // Allow commit to be called only once protected boolean finished = false; - public AbstractWriteTx(final NetconfBaseOps netOps, final RemoteDeviceId id, final DataNormalizer normalizer, final NetconfSessionCapabilities netconfSessionPreferences) { + public AbstractWriteTx(final NetconfBaseOps netOps, final RemoteDeviceId id, final DataNormalizer normalizer, final NetconfSessionPreferences netconfSessionPreferences) { this.netOps = netOps; this.id = id; this.normalizer = normalizer; diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/WriteCandidateRunningTx.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/WriteCandidateRunningTx.java index 4a9a9398d0..710700b362 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/WriteCandidateRunningTx.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/WriteCandidateRunningTx.java @@ -12,7 +12,7 @@ import com.google.common.base.Function; import com.google.common.util.concurrent.ListenableFuture; import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer; import org.opendaylight.controller.netconf.api.NetconfDocumentedException; -import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.controller.sal.connect.netconf.util.NetconfBaseOps; import org.opendaylight.controller.sal.connect.netconf.util.NetconfRpcFutureCallback; import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; @@ -32,7 +32,7 @@ public class WriteCandidateRunningTx extends WriteCandidateTx { private static final Logger LOG = LoggerFactory.getLogger(WriteCandidateRunningTx.class); - public WriteCandidateRunningTx(final RemoteDeviceId id, final NetconfBaseOps netOps, final DataNormalizer normalizer, final NetconfSessionCapabilities netconfSessionPreferences) { + public WriteCandidateRunningTx(final RemoteDeviceId id, final NetconfBaseOps netOps, final DataNormalizer normalizer, final NetconfSessionPreferences netconfSessionPreferences) { super(id, netOps, normalizer, netconfSessionPreferences); } diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/WriteCandidateTx.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/WriteCandidateTx.java index 0ea6298398..f9bf3c75fd 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/WriteCandidateTx.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/WriteCandidateTx.java @@ -17,7 +17,7 @@ import org.opendaylight.controller.md.sal.common.api.TransactionStatus; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer; import org.opendaylight.controller.netconf.api.NetconfDocumentedException; -import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.controller.sal.connect.netconf.util.NetconfBaseOps; import org.opendaylight.controller.sal.connect.netconf.util.NetconfRpcFutureCallback; import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; @@ -69,7 +69,7 @@ public class WriteCandidateTx extends AbstractWriteTx { } }; - public WriteCandidateTx(final RemoteDeviceId id, final NetconfBaseOps rpc, final DataNormalizer normalizer, final NetconfSessionCapabilities netconfSessionPreferences) { + public WriteCandidateTx(final RemoteDeviceId id, final NetconfBaseOps rpc, final DataNormalizer normalizer, final NetconfSessionPreferences netconfSessionPreferences) { super(rpc, id, normalizer, netconfSessionPreferences); } diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/WriteRunningTx.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/WriteRunningTx.java index 28173b1da3..f92e40fb57 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/WriteRunningTx.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/WriteRunningTx.java @@ -17,7 +17,7 @@ import org.opendaylight.controller.md.sal.common.api.TransactionStatus; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer; import org.opendaylight.controller.netconf.api.NetconfDocumentedException; -import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.controller.sal.connect.netconf.util.NetconfBaseOps; import org.opendaylight.controller.sal.connect.netconf.util.NetconfRpcFutureCallback; import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; @@ -50,7 +50,7 @@ public class WriteRunningTx extends AbstractWriteTx { private static final Logger LOG = LoggerFactory.getLogger(WriteRunningTx.class); public WriteRunningTx(final RemoteDeviceId id, final NetconfBaseOps netOps, - final DataNormalizer normalizer, final NetconfSessionCapabilities netconfSessionPreferences) { + final DataNormalizer normalizer, final NetconfSessionPreferences netconfSessionPreferences) { super(netOps, id, normalizer, netconfSessionPreferences); } diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/util/RemoteDeviceId.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/util/RemoteDeviceId.java index 333b42e1c5..7f13a7a5dd 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/util/RemoteDeviceId.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/util/RemoteDeviceId.java @@ -7,33 +7,67 @@ */ package org.opendaylight.controller.sal.connect.util; +import com.google.common.base.Preconditions; +import java.net.InetSocketAddress; import org.opendaylight.controller.config.api.ModuleIdentifier; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.HostBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; - -import com.google.common.base.Preconditions; +import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -public class RemoteDeviceId { +public final class RemoteDeviceId { private final String name; private final org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier path; private final InstanceIdentifier bindingPath; private final NodeKey key; + private final org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier topologyPath; + private final InstanceIdentifier topologyBindingPath; + private InetSocketAddress address; + private Host host; + @Deprecated public RemoteDeviceId(final ModuleIdentifier identifier) { this(Preconditions.checkNotNull(identifier).getInstanceName()); } + public RemoteDeviceId(final ModuleIdentifier identifier, Host host) { + this(identifier); + this.host = host; + } + + public RemoteDeviceId(final ModuleIdentifier identifier, InetSocketAddress address) { + this(identifier); + this.address = address; + this.host = buildHost(); + } + + @Deprecated public RemoteDeviceId(final String name) { Preconditions.checkNotNull(name); this.name = name; this.key = new NodeKey(new NodeId(name)); this.path = createBIPath(name); this.bindingPath = createBindingPath(key); + this.topologyPath = createBIPathForTopology(name); + this.topologyBindingPath = createBindingPathForTopology(key); + } + + public RemoteDeviceId(final String name, InetSocketAddress address) { + this(name); + this.address = address; + this.host = buildHost(); } private static InstanceIdentifier createBindingPath(final NodeKey key) { @@ -48,6 +82,32 @@ public class RemoteDeviceId { return builder.build(); } + private static InstanceIdentifier createBindingPathForTopology(final NodeKey key) { + final InstanceIdentifier networkTopology = InstanceIdentifier.builder(NetworkTopology.class).build(); + final KeyedInstanceIdentifier topology = networkTopology.child(Topology.class, new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName()))); + return topology + .child(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.class, + new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey + (new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId(key.getId().getValue()))); + } + + private static org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier createBIPathForTopology(final String name) { + final org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.InstanceIdentifierBuilder builder = + org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.builder(); + builder + .node(NetworkTopology.QNAME) + .nodeWithKey(Topology.QNAME, QName.create(Topology.QNAME, "topology-id"), TopologyNetconf.QNAME.getLocalName()) + .nodeWithKey(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.QNAME, + QName.create(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.QNAME, "node-id"), name); + return builder.build(); + } + + private Host buildHost() { + return address.getAddress().getHostAddress() != null + ? HostBuilder.getDefaultInstance(address.getAddress().getHostAddress()) + : HostBuilder.getDefaultInstance(address.getAddress().getHostName()); + } + public String getName() { return name; } @@ -64,6 +124,22 @@ public class RemoteDeviceId { return key; } + public InstanceIdentifier getTopologyBindingPath() { + return topologyBindingPath; + } + + public YangInstanceIdentifier getTopologyPath() { + return topologyPath; + } + + public InetSocketAddress getAddress() { + return address; + } + + public Host getHost() { + return host; + } + @Override public String toString() { return "RemoteDevice{" + name +'}'; diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/yang/netconf-node-topology.yang b/opendaylight/md-sal/sal-netconf-connector/src/main/yang/netconf-node-topology.yang new file mode 100644 index 0000000000..11bf6a549c --- /dev/null +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/yang/netconf-node-topology.yang @@ -0,0 +1,75 @@ +module netconf-node-topology { + namespace "urn:opendaylight:netconf-node-topology"; + prefix "nettop"; + + import network-topology { prefix nt; revision-date 2013-10-21; } + import yang-ext { prefix ext; revision-date "2013-07-09";} + import ietf-inet-types { prefix inet; revision-date "2010-09-24"; } + + revision "2015-01-14" { + description "Initial revision of Topology model"; + } + + augment "/nt:network-topology/nt:topology/nt:topology-types" { + container topology-netconf { + } + } + + grouping netconf-node-fields { + leaf connection-status { + type enumeration { + enum connecting; + enum connected; + enum unable-to-connect; + } + } + + leaf host { + type inet:host; + } + + leaf port { + type inet:port-number; + } + + leaf connected-message { + type string; + } + + container available-capabilities { + leaf-list available-capability { + type string; + } + } + + container unavailable-capabilities { + list unavailable-capability { + leaf capability { + type string; + } + + leaf failure-reason { + type enumeration { + enum missing-source; + enum unable-to-resolve; + } + } + } + } + + container pass-through { + when "../connection-status = connected"; + description + "When the underlying node is connected, its NETCONF context + is available verbatim under this container through the + mount extension."; + } + } + + augment "/nt:network-topology/nt:topology/nt:node" { + when "../../nt:topology-types/topology-netconf"; + ext:augment-identifier "netconf-node"; + + uses netconf-node-fields; + } +} diff --git a/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTest.java b/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTest.java index 80ac4d7376..0ddafa375f 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTest.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTest.java @@ -17,6 +17,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; + import com.google.common.base.Optional; import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; @@ -39,7 +40,7 @@ import org.opendaylight.controller.sal.connect.api.MessageTransformer; import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator; import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler; import org.opendaylight.controller.sal.connect.api.SchemaSourceProviderFactory; -import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc; import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil; import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; @@ -90,7 +91,7 @@ public class NetconfDeviceTest { private static final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver = new NetconfStateSchemas.NetconfStateSchemasResolver() { @Override - public NetconfStateSchemas resolve(final NetconfDeviceRpc deviceRpc, final NetconfSessionCapabilities remoteSessionCapabilities, final RemoteDeviceId id) { + public NetconfStateSchemas resolve(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id) { return NetconfStateSchemas.EMPTY; } }; @@ -99,7 +100,7 @@ public class NetconfDeviceTest { public void testNetconfDeviceFailFirstSchemaFailSecondEmpty() throws Exception { final ArrayList capList = Lists.newArrayList(TEST_CAPABILITY); - final RemoteDeviceHandler facade = getFacade(); + final RemoteDeviceHandler facade = getFacade(); final RemoteDeviceCommunicator listener = getListener(); final SchemaContextFactory schemaFactory = getSchemaFactory(); @@ -116,7 +117,7 @@ public class NetconfDeviceTest { = new NetconfDevice.SchemaResourcesDTO(getSchemaRegistry(), schemaFactory, stateSchemasResolver); final NetconfDevice device = new NetconfDevice(schemaResourcesDTO, getId(), facade, getExecutor(), getMessageTransformer()); // Monitoring not supported - final NetconfSessionCapabilities sessionCaps = getSessionCaps(false, capList); + final NetconfSessionPreferences sessionCaps = getSessionCaps(false, capList); device.onRemoteSessionUp(sessionCaps, listener); Mockito.verify(facade, Mockito.timeout(5000)).onDeviceDisconnected(); @@ -126,7 +127,7 @@ public class NetconfDeviceTest { @Test public void testNetconfDeviceMissingSource() throws Exception { - final RemoteDeviceHandler facade = getFacade(); + final RemoteDeviceHandler facade = getFacade(); final RemoteDeviceCommunicator listener = getListener(); final SchemaContextFactory schemaFactory = getSchemaFactory(); @@ -148,10 +149,10 @@ public class NetconfDeviceTest { = new NetconfDevice.SchemaResourcesDTO(getSchemaRegistry(), schemaFactory, stateSchemasResolver); final NetconfDevice device = new NetconfDevice(schemaResourcesDTO, getId(), facade, getExecutor(), getMessageTransformer()); // Monitoring supported - final NetconfSessionCapabilities sessionCaps = getSessionCaps(true, Lists.newArrayList(TEST_CAPABILITY, TEST_CAPABILITY2)); + final NetconfSessionPreferences sessionCaps = getSessionCaps(true, Lists.newArrayList(TEST_CAPABILITY, TEST_CAPABILITY2)); device.onRemoteSessionUp(sessionCaps, listener); - Mockito.verify(facade, Mockito.timeout(5000)).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionCapabilities.class), any(RpcImplementation.class)); + Mockito.verify(facade, Mockito.timeout(5000)).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionPreferences.class), any(RpcImplementation.class)); Mockito.verify(schemaFactory, times(2)).createSchemaContext(anyCollectionOf(SourceIdentifier.class)); } @@ -165,7 +166,7 @@ public class NetconfDeviceTest { @Test public void testNotificationBeforeSchema() throws Exception { - final RemoteDeviceHandler facade = getFacade(); + final RemoteDeviceHandler facade = getFacade(); final RemoteDeviceCommunicator listener = getListener(); final MessageTransformer messageTransformer = getMessageTransformer(); @@ -179,7 +180,7 @@ public class NetconfDeviceTest { verify(facade, times(0)).onNotification(any(CompositeNode.class)); - final NetconfSessionCapabilities sessionCaps = getSessionCaps(true, + final NetconfSessionPreferences sessionCaps = getSessionCaps(true, Lists.newArrayList(TEST_CAPABILITY)); device.onRemoteSessionUp(sessionCaps, listener); @@ -194,7 +195,7 @@ public class NetconfDeviceTest { @Test public void testNetconfDeviceReconnect() throws Exception { - final RemoteDeviceHandler facade = getFacade(); + final RemoteDeviceHandler facade = getFacade(); final RemoteDeviceCommunicator listener = getListener(); final SchemaContextFactory schemaContextProviderFactory = getSchemaFactory(); @@ -203,13 +204,13 @@ public class NetconfDeviceTest { final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = new NetconfDevice.SchemaResourcesDTO(getSchemaRegistry(), schemaContextProviderFactory, stateSchemasResolver); final NetconfDevice device = new NetconfDevice(schemaResourcesDTO, getId(), facade, getExecutor(), messageTransformer); - final NetconfSessionCapabilities sessionCaps = getSessionCaps(true, + final NetconfSessionPreferences sessionCaps = getSessionCaps(true, Lists.newArrayList(TEST_NAMESPACE + "?module=" + TEST_MODULE + "&revision=" + TEST_REVISION)); device.onRemoteSessionUp(sessionCaps, listener); verify(schemaContextProviderFactory, timeout(5000)).createSchemaContext(any(Collection.class)); verify(messageTransformer, timeout(5000)).onGlobalContextUpdated(any(SchemaContext.class)); - verify(facade, timeout(5000)).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionCapabilities.class), any(RpcImplementation.class)); + verify(facade, timeout(5000)).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionPreferences.class), any(RpcImplementation.class)); device.onRemoteSessionDown(); verify(facade, timeout(5000)).onDeviceDisconnected(); @@ -218,7 +219,7 @@ public class NetconfDeviceTest { verify(schemaContextProviderFactory, timeout(5000).times(2)).createSchemaContext(any(Collection.class)); verify(messageTransformer, timeout(5000).times(3)).onGlobalContextUpdated(any(SchemaContext.class)); - verify(facade, timeout(5000).times(2)).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionCapabilities.class), any(RpcImplementation.class)); + verify(facade, timeout(5000).times(2)).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionPreferences.class), any(RpcImplementation.class)); } private SchemaContextFactory getSchemaFactory() { @@ -236,9 +237,9 @@ public class NetconfDeviceTest { return parser.resolveSchemaContext(models); } - private RemoteDeviceHandler getFacade() throws Exception { - final RemoteDeviceHandler remoteDeviceHandler = mockCloseableClass(RemoteDeviceHandler.class); - doNothing().when(remoteDeviceHandler).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionCapabilities.class), any(RpcImplementation.class)); + private RemoteDeviceHandler getFacade() throws Exception { + final RemoteDeviceHandler remoteDeviceHandler = mockCloseableClass(RemoteDeviceHandler.class); + doNothing().when(remoteDeviceHandler).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionPreferences.class), any(RpcImplementation.class)); doNothing().when(remoteDeviceHandler).onDeviceDisconnected(); doNothing().when(remoteDeviceHandler).onNotification(any(CompositeNode.class)); return remoteDeviceHandler; @@ -283,7 +284,7 @@ public class NetconfDeviceTest { return messageTransformer; } - public NetconfSessionCapabilities getSessionCaps(final boolean addMonitor, final Collection additionalCapabilities) { + public NetconfSessionPreferences getSessionCaps(final boolean addMonitor, final Collection additionalCapabilities) { final ArrayList capabilities = Lists.newArrayList( XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0, XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1); @@ -294,7 +295,7 @@ public class NetconfDeviceTest { capabilities.addAll(additionalCapabilities); - return NetconfSessionCapabilities.fromStrings( + return NetconfSessionPreferences.fromStrings( capabilities); } diff --git a/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicatorTest.java b/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicatorTest.java index a24034d2f0..fad3d8e1ea 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicatorTest.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicatorTest.java @@ -77,7 +77,7 @@ public class NetconfDeviceCommunicatorTest { NetconfClientSession mockSession; @Mock - RemoteDevice mockDevice; + RemoteDevice mockDevice; NetconfDeviceCommunicator communicator; @@ -92,7 +92,7 @@ public class NetconfDeviceCommunicatorTest { void setupSession() { doReturn( Collections.emptySet() ).when( mockSession ).getServerCapabilities(); - doNothing().when( mockDevice ).onRemoteSessionUp( any( NetconfSessionCapabilities.class ), + doNothing().when( mockDevice ).onRemoteSessionUp( any( NetconfSessionPreferences.class ), any( RemoteDeviceCommunicator.class ) ); communicator.onSessionUp( mockSession ); } @@ -130,8 +130,8 @@ public class NetconfDeviceCommunicatorTest { testCapability ); doReturn( serverCapabilities ).when( mockSession ).getServerCapabilities(); - ArgumentCaptor netconfSessionCapabilities = - ArgumentCaptor.forClass( NetconfSessionCapabilities.class ); + ArgumentCaptor netconfSessionCapabilities = + ArgumentCaptor.forClass( NetconfSessionPreferences.class ); doNothing().when( mockDevice ).onRemoteSessionUp( netconfSessionCapabilities.capture(), eq( communicator ) ); communicator.onSessionUp( mockSession ); @@ -139,7 +139,7 @@ public class NetconfDeviceCommunicatorTest { verify( mockSession ).getServerCapabilities(); verify( mockDevice ).onRemoteSessionUp( netconfSessionCapabilities.capture(), eq( communicator ) ); - NetconfSessionCapabilities actualCapabilites = netconfSessionCapabilities.getValue(); + NetconfSessionPreferences actualCapabilites = netconfSessionCapabilities.getValue(); assertEquals( "containsModuleCapability", true, actualCapabilites.containsNonModuleCapability( NetconfMessageTransformUtil.NETCONF_ROLLBACK_ON_ERROR_URI.toString()) ); assertEquals( "containsModuleCapability", false, actualCapabilites.containsNonModuleCapability(testCapability) ); @@ -340,7 +340,7 @@ public class NetconfDeviceCommunicatorTest { */ @Test public void testNetconfDeviceReconnectInCommunicator() throws Exception { - final RemoteDevice device = mock(RemoteDevice.class); + final RemoteDevice device = mock(RemoteDevice.class); final TimedReconnectStrategy timedReconnectStrategy = new TimedReconnectStrategy(GlobalEventExecutor.INSTANCE, 10000, 0, 1.0, null, 100L, null); final ReconnectStrategy reconnectStrategy = spy(new ReconnectStrategy() { diff --git a/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionCapabilitiesTest.java b/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionPreferencesTest.java similarity index 81% rename from opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionCapabilitiesTest.java rename to opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionPreferencesTest.java index ae7d9c28ac..653b641353 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionCapabilitiesTest.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionPreferencesTest.java @@ -10,7 +10,7 @@ import org.junit.Test; import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil; import org.opendaylight.yangtools.yang.common.QName; -public class NetconfSessionCapabilitiesTest { +public class NetconfSessionPreferencesTest { @Test public void testMerge() throws Exception { @@ -21,7 +21,7 @@ public class NetconfSessionCapabilitiesTest { "urn:ietf:params:netconf:base:1.0", "urn:ietf:params:netconf:capability:rollback-on-error:1.0" ); - final NetconfSessionCapabilities sessionCaps1 = NetconfSessionCapabilities.fromStrings(caps1); + final NetconfSessionPreferences sessionCaps1 = NetconfSessionPreferences.fromStrings(caps1); assertCaps(sessionCaps1, 2, 3); final List caps2 = Lists.newArrayList( @@ -29,10 +29,10 @@ public class NetconfSessionCapabilitiesTest { "namespace:4?module=module4&revision=2012-12-12", "randomNonModuleCap" ); - final NetconfSessionCapabilities sessionCaps2 = NetconfSessionCapabilities.fromStrings(caps2); + final NetconfSessionPreferences sessionCaps2 = NetconfSessionPreferences.fromStrings(caps2); assertCaps(sessionCaps2, 1, 2); - final NetconfSessionCapabilities merged = sessionCaps1.replaceModuleCaps(sessionCaps2); + final NetconfSessionPreferences merged = sessionCaps1.replaceModuleCaps(sessionCaps2); assertCaps(merged, 2, 2 + 1 /*Preserved monitoring*/); for (final QName qName : sessionCaps2.getModuleBasedCaps()) { assertThat(merged.getModuleBasedCaps(), hasItem(qName)); @@ -52,11 +52,11 @@ public class NetconfSessionCapabilitiesTest { "namespace:2?module=module2&RANDOMSTRING;revision=2013-12-12" // This one should be ignored(same as first), since revision is in wrong format ); - final NetconfSessionCapabilities sessionCaps1 = NetconfSessionCapabilities.fromStrings(caps1); + final NetconfSessionPreferences sessionCaps1 = NetconfSessionPreferences.fromStrings(caps1); assertCaps(sessionCaps1, 0, 3); } - private void assertCaps(final NetconfSessionCapabilities sessionCaps1, final int nonModuleCaps, final int moduleCaps) { + private void assertCaps(final NetconfSessionPreferences sessionCaps1, final int nonModuleCaps, final int moduleCaps) { assertEquals(nonModuleCaps, sessionCaps1.getNonModuleCaps().size()); assertEquals(moduleCaps, sessionCaps1.getModuleBasedCaps().size()); } diff --git a/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceTopologyAdapterTest.java b/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceTopologyAdapterTest.java new file mode 100644 index 0000000000..a1551b23b6 --- /dev/null +++ b/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceTopologyAdapterTest.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.sal.connect.netconf.sal; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.google.common.util.concurrent.Futures; +import java.net.InetSocketAddress; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCapabilities; +import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; + +public class NetconfDeviceTopologyAdapterTest { + + private RemoteDeviceId id = new RemoteDeviceId("test", new InetSocketAddress("localhost", 22)); + + @Mock + private DataBroker broker; + @Mock + private WriteTransaction writeTx; + @Mock + private Node data; + + private String txIdent = "test transaction"; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + doReturn(writeTx).when(broker).newWriteOnlyTransaction(); + doNothing().when(writeTx).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(Node.class)); + doNothing().when(writeTx).merge(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(Node.class)); + + doReturn(txIdent).when(writeTx).getIdentifier(); + } + + @Test + public void testFailedDevice() throws Exception { + doReturn(Futures.immediateCheckedFuture(null)).when(writeTx).submit(); + + NetconfDeviceTopologyAdapter adapter = new NetconfDeviceTopologyAdapter(id, broker); + adapter.setDeviceAsFailed(null); + + verify(broker, times(2)).newWriteOnlyTransaction(); + verify(writeTx, times(3)).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(Node.class)); + } + + @Test + public void testDeviceUpdate() throws Exception { + doReturn(Futures.immediateCheckedFuture(null)).when(writeTx).submit(); + + NetconfDeviceTopologyAdapter adapter = new NetconfDeviceTopologyAdapter(id, broker); + adapter.updateDeviceData(true, new NetconfDeviceCapabilities()); + + verify(broker, times(2)).newWriteOnlyTransaction(); + verify(writeTx, times(3)).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(Node.class)); + } + +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/NetconfDeviceWriteOnlyTxTest.java b/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/NetconfDeviceWriteOnlyTxTest.java index ce97541fe4..a37fade915 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/NetconfDeviceWriteOnlyTxTest.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/NetconfDeviceWriteOnlyTxTest.java @@ -21,7 +21,7 @@ import org.mockito.MockitoAnnotations; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer; -import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.controller.sal.connect.netconf.util.NetconfBaseOps; import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil; import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; @@ -60,7 +60,7 @@ public class NetconfDeviceWriteOnlyTxTest { @Test public void testDiscardChanges() { final WriteCandidateTx tx = new WriteCandidateTx(id, new NetconfBaseOps(rpc), normalizer, - NetconfSessionCapabilities.fromStrings(Collections.emptySet())); + NetconfSessionPreferences.fromStrings(Collections.emptySet())); final CheckedFuture submitFuture = tx.submit(); try { submitFuture.checkedGet(); @@ -84,7 +84,7 @@ public class NetconfDeviceWriteOnlyTxTest { .when(rpc).invokeRpc(any(QName.class), any(CompositeNode.class)); final WriteRunningTx tx = new WriteRunningTx(id, new NetconfBaseOps(rpc), normalizer, - NetconfSessionCapabilities.fromStrings(Collections.emptySet())); + NetconfSessionPreferences.fromStrings(Collections.emptySet())); try { tx.delete(LogicalDatastoreType.CONFIGURATION, yangIId); } catch (final Exception e) { diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java index 48ccd824d4..13399f6f9d 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java @@ -10,16 +10,15 @@ package org.opendaylight.controller.remote.rpc; import akka.actor.Terminated; import akka.actor.UntypedActor; -import akka.event.Logging; -import akka.event.LoggingAdapter; import org.opendaylight.controller.cluster.common.actor.Monitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TerminationMonitor extends UntypedActor{ - protected final LoggingAdapter LOG = - Logging.getLogger(getContext().system(), this); + private static final Logger LOG = LoggerFactory.getLogger(TerminationMonitor.class); public TerminationMonitor(){ - LOG.info("Created TerminationMonitor"); + LOG.debug("Created TerminationMonitor"); } @Override public void onReceive(Object message) throws Exception { diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index 845c1c819a..219646d847 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -8,8 +8,6 @@ package org.opendaylight.controller.remote.rpc.registry; import akka.actor.ActorRef; -import akka.event.Logging; -import akka.event.LoggingAdapter; import akka.japi.Option; import akka.japi.Pair; import com.google.common.base.Preconditions; @@ -32,8 +30,6 @@ import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; */ public class RpcRegistry extends BucketStore { - final LoggingAdapter log = Logging.getLogger(getContext().system(), this); - public RpcRegistry() { getLocalBucket().setData(new RoutingTable()); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java index 934609b7cf..628deb4311 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java @@ -13,8 +13,6 @@ import akka.actor.ActorRefProvider; import akka.actor.Address; import akka.actor.Props; import akka.cluster.ClusterActorRefProvider; -import akka.event.Logging; -import akka.event.LoggingAdapter; import com.google.common.annotations.VisibleForTesting; import java.util.HashMap; import java.util.Map; @@ -29,6 +27,8 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketSto import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; import org.opendaylight.controller.utils.ConditionalProbe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A store that syncs its data across nodes in the cluster. @@ -43,7 +43,7 @@ public class BucketStore> extends AbstractUntypedActorWithMe private static final Long NO_VERSION = -1L; - final LoggingAdapter log = Logging.getLogger(getContext().system(), this); + protected final Logger log = LoggerFactory.getLogger(getClass()); /** * Bucket owned by the node diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java index 1bbcc69f5e..8af1c83c55 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java @@ -17,14 +17,7 @@ import akka.cluster.ClusterActorRefProvider; import akka.cluster.ClusterEvent; import akka.cluster.Member; import akka.dispatch.Mapper; -import akka.event.Logging; -import akka.event.LoggingAdapter; import akka.pattern.Patterns; -import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; -import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -32,15 +25,20 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; - -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; /** * Gossiper that syncs bucket store across nodes in the cluster. @@ -61,7 +59,7 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Go public class Gossiper extends AbstractUntypedActorWithMetering { - final LoggingAdapter log = Logging.getLogger(getContext().system(), this); + private final Logger log = LoggerFactory.getLogger(getClass()); private Cluster cluster; @@ -121,30 +119,29 @@ public class Gossiper extends AbstractUntypedActorWithMetering { @Override public void postStop(){ - if (cluster != null) + if (cluster != null) { cluster.unsubscribe(getSelf()); - if (gossipTask != null) + } + if (gossipTask != null) { gossipTask.cancel(); + } } @Override protected void handleReceive(Object message) throws Exception { //Usually sent by self via gossip task defined above. But its not enforced. //These ticks can be sent by another actor as well which is esp. useful while testing - if (message instanceof GossipTick) + if (message instanceof GossipTick) { receiveGossipTick(); - - //Message from remote gossiper with its bucket versions - else if (message instanceof GossipStatus) + } else if (message instanceof GossipStatus) { + // Message from remote gossiper with its bucket versions receiveGossipStatus((GossipStatus) message); - - //Message from remote gossiper with buckets. This is usually in response to GossipStatus message - //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus - //message with its local versions - else if (message instanceof GossipEnvelope) + } else if (message instanceof GossipEnvelope) { + // Message from remote gossiper with buckets. This is usually in response to GossipStatus + // message. The contained buckets are newer as determined by the remote gossiper by + // comparing the GossipStatus message with its local versions. receiveGossip((GossipEnvelope) message); - - else if (message instanceof ClusterEvent.MemberUp) { + } else if (message instanceof ClusterEvent.MemberUp) { receiveMemberUp(((ClusterEvent.MemberUp) message).member()); } else if (message instanceof ClusterEvent.MemberRemoved) { @@ -153,8 +150,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering { } else if ( message instanceof ClusterEvent.UnreachableMember){ receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member()); - } else + } else { unhandled(message); + } } /** @@ -181,11 +179,13 @@ public class Gossiper extends AbstractUntypedActorWithMetering { */ void receiveMemberUp(Member member) { - if (selfAddress.equals(member.address())) + if (selfAddress.equals(member.address())) { return; //ignore up notification for self + } - if (!clusterMembers.contains(member.address())) + if (!clusterMembers.contains(member.address())) { clusterMembers.add(member.address()); + } if(log.isDebugEnabled()) { log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers); } @@ -198,13 +198,15 @@ public class Gossiper extends AbstractUntypedActorWithMetering { * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it. */ void receiveGossipTick(){ - if (clusterMembers.size() == 0) return; //no members to send gossip status to + if (clusterMembers.size() == 0) { + return; //no members to send gossip status to + } Address remoteMemberToGossipTo; - if (clusterMembers.size() == 1) + if (clusterMembers.size() == 1) { remoteMemberToGossipTo = clusterMembers.get(0); - else { + } else { Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size()); remoteMemberToGossipTo = clusterMembers.get(randomIndex); } @@ -229,8 +231,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering { */ void receiveGossipStatus(GossipStatus status){ //Don't accept messages from non-members - if (!clusterMembers.contains(status.from())) + if (!clusterMembers.contains(status.from())) { return; + } final ActorRef sender = getSender(); Future futureReply = @@ -385,19 +388,23 @@ public class Gossiper extends AbstractUntypedActorWithMetering { for (Address address : remoteVersions.keySet()){ - if (localVersions.get(address) == null || remoteVersions.get(address) == null) + if (localVersions.get(address) == null || remoteVersions.get(address) == null) { continue; //this condition is taken care of by above diffs - if (localVersions.get(address) < remoteVersions.get(address)) + } + if (localVersions.get(address) < remoteVersions.get(address)) { localIsOlder.add(address); - else if (localVersions.get(address) > remoteVersions.get(address)) + } else if (localVersions.get(address) > remoteVersions.get(address)) { localIsNewer.add(address); + } } - if (!localIsOlder.isEmpty()) + if (!localIsOlder.isEmpty()) { sendGossipStatusTo(sender, localVersions ); + } - if (!localIsNewer.isEmpty()) + if (!localIsNewer.isEmpty()) { sendGossipTo(sender, localIsNewer);//send newer buckets to remote + } } return null; diff --git a/opendaylight/netconf/netconf-cli/pom.xml b/opendaylight/netconf/netconf-cli/pom.xml index c292d93206..e1226a5dc4 100644 --- a/opendaylight/netconf/netconf-cli/pom.xml +++ b/opendaylight/netconf/netconf-cli/pom.xml @@ -65,6 +65,10 @@ org.opendaylight.yangtools yang-parser-impl + + org.opendaylight.controller + sal-netconf-connector + diff --git a/opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/Main.java b/opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/Main.java index 64397de118..8c38ee29e9 100644 --- a/opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/Main.java +++ b/opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/Main.java @@ -70,7 +70,7 @@ public class Main { } case SSH: { writeStatus(consoleIO, "Connecting to %s via SSH. Please wait.", cliArgs.getAddress()); - connectionManager.connectBlocking(cliArgs.getAddress(), getClientSshConfig(cliArgs)); + connectionManager.connectBlocking(cliArgs.getAddress(), cliArgs.getServerAddress(), getClientSshConfig(cliArgs)); break; } case NONE: {/* Do not connect initially */ diff --git a/opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/NetconfDeviceConnectionHandler.java b/opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/NetconfDeviceConnectionHandler.java index d5c9dc6fc7..bede549536 100644 --- a/opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/NetconfDeviceConnectionHandler.java +++ b/opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/NetconfDeviceConnectionHandler.java @@ -14,7 +14,7 @@ import org.opendaylight.controller.netconf.cli.commands.CommandDispatcher; import org.opendaylight.controller.netconf.cli.io.ConsoleContext; import org.opendaylight.controller.netconf.cli.io.ConsoleIO; import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler; -import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.controller.sal.core.api.RpcImplementation; import org.opendaylight.yangtools.yang.data.api.CompositeNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -23,7 +23,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; * Implementation of RemoteDeviceHandler. Integrates cli with * sal-netconf-connector. */ -public class NetconfDeviceConnectionHandler implements RemoteDeviceHandler { +public class NetconfDeviceConnectionHandler implements RemoteDeviceHandler { private final CommandDispatcher commandDispatcher; private final SchemaContextRegistry schemaContextRegistry; @@ -42,7 +42,7 @@ public class NetconfDeviceConnectionHandler implements RemoteDeviceHandler connectBlocking(final String name, final NetconfClientConfigurationBuilder configBuilder) { - this.connect(name, configBuilder); + public synchronized Set connectBlocking(final String name, final InetSocketAddress address, final NetconfClientConfigurationBuilder configBuilder) { + this.connect(name, address, configBuilder); synchronized (handler) { while (handler.isUp() == false) { try { diff --git a/opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/commands/local/Connect.java b/opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/commands/local/Connect.java index f702aa3805..54706b8cb9 100644 --- a/opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/commands/local/Connect.java +++ b/opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/commands/local/Connect.java @@ -53,11 +53,11 @@ public class Connect extends AbstractCommand { @Override public Output invoke(final Input inputArgs) { final NetconfClientConfigurationBuilder config = getConfig(inputArgs); - return invoke(config, getArgument(inputArgs, "address-name", String.class)); + return invoke(config, getArgument(inputArgs, "address-name", String.class), inputArgs); } - private Output invoke(final NetconfClientConfigurationBuilder config, final String addressName) { - final Set remoteCmds = connectManager.connectBlocking(addressName, config); + private Output invoke(final NetconfClientConfigurationBuilder config, final String addressName, final Input inputArgs) { + final Set remoteCmds = connectManager.connectBlocking(addressName, getAdress(inputArgs), config); final ArrayList> output = Lists.newArrayList(); output.add(new SimpleNodeTOImpl<>(QName.create(getCommandId(), "status"), null, "Connection initiated")); @@ -92,6 +92,17 @@ public class Connect extends AbstractCommand { .withProtocol(NetconfClientConfiguration.NetconfClientProtocol.SSH); } + private InetSocketAddress getAdress(final Input inputArgs) { + final String address = getArgument(inputArgs, "address-name", String.class); + final InetSocketAddress inetAddress; + try { + inetAddress = new InetSocketAddress(InetAddress.getByName(address), getArgument(inputArgs, "address-port", Integer.class)); + } catch (final UnknownHostException e) { + throw new IllegalArgumentException("Unable to use address: " + address, e); + } + return inetAddress; + } + private Optional getArgumentOpt(final Input inputArgs, final String argName, final Class type) { final QName argQName = QName.create(getCommandId(), argName); final Node argumentNode = inputArgs.getArg(argName); diff --git a/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java b/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java index 7d568b6462..a938fbf565 100644 --- a/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java +++ b/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java @@ -60,7 +60,7 @@ import org.opendaylight.controller.netconf.util.xml.XmlUtil; import org.opendaylight.controller.sal.connect.api.RemoteDevice; import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator; import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator; -import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities; +import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; import org.opendaylight.protocol.framework.NeverReconnectStrategy; import org.opendaylight.yangtools.yang.common.QName; @@ -199,8 +199,8 @@ public class NetconfITSecureTest extends AbstractNetconfConfigTest { } static NetconfDeviceCommunicator getSessionListener() { - RemoteDevice mockedRemoteDevice = mock(RemoteDevice.class); - doNothing().when(mockedRemoteDevice).onRemoteSessionUp(any(NetconfSessionCapabilities.class), any(RemoteDeviceCommunicator.class)); + RemoteDevice mockedRemoteDevice = mock(RemoteDevice.class); + doNothing().when(mockedRemoteDevice).onRemoteSessionUp(any(NetconfSessionPreferences.class), any(RemoteDeviceCommunicator.class)); doNothing().when(mockedRemoteDevice).onRemoteSessionDown(); return new NetconfDeviceCommunicator(new RemoteDeviceId("secure-test"), mockedRemoteDevice); } diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfMessageToEXIEncoder.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfMessageToEXIEncoder.java index aceb6ac520..5d6d1aa083 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfMessageToEXIEncoder.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfMessageToEXIEncoder.java @@ -30,23 +30,29 @@ public final class NetconfMessageToEXIEncoder extends MessageToByteEncoder content; + + public FakeModuleBuilderCapability(final ModuleBuilder input, final String inputStream) { + this.input = input; + this.content = Optional.of(inputStream); + } + + @Override + public String getCapabilityUri() { + // FIXME capabilities in Netconf-impl need to check for NO REVISION + final String withoutRevision = getModuleNamespace().get() + "?module=" + getModuleName().get(); + return hasRevision() ? withoutRevision + "&revision=" + Util.writeDate(input.getRevision()) : withoutRevision; + } + + @Override + public Optional getModuleNamespace() { + return Optional.of(input.getNamespace().toString()); + } + + @Override + public Optional getModuleName() { + return Optional.of(input.getName()); + } + + @Override + public Optional getRevision() { + return Optional.of(hasRevision() ? QName.formattedRevision(input.getRevision()) : ""); + } + + private boolean hasRevision() { + return !input.getRevision().equals(NO_REVISION); + } + + /** + * + * @return empty schema source to trigger schema resolution exception. + */ + @Override + public Optional getCapabilitySchema() { + return Optional.absent(); + } + + @Override + public List getLocation() { + return Collections.emptyList(); + } +} diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/NetconfDeviceSimulator.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/NetconfDeviceSimulator.java index 83e1f9129b..a5f4947474 100644 --- a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/NetconfDeviceSimulator.java +++ b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/NetconfDeviceSimulator.java @@ -100,6 +100,8 @@ public class NetconfDeviceSimulator implements Closeable { private final ScheduledExecutorService minaTimerExecutor; private final ExecutorService nioExecutor; + private boolean sendFakeSchema = false; + public NetconfDeviceSimulator() { // TODO make pool size configurable this(new NioEventLoopGroup(), new HashedWheelTimer(), @@ -119,7 +121,12 @@ public class NetconfDeviceSimulator implements Closeable { final Set capabilities = Sets.newHashSet(Collections2.transform(moduleBuilders.keySet(), new Function() { @Override public Capability apply(final ModuleBuilder input) { - return new ModuleBuilderCapability(input, moduleBuilders.get(input)); + if (sendFakeSchema) { + sendFakeSchema = false; + return new FakeModuleBuilderCapability(input, moduleBuilders.get(input)); + } else { + return new ModuleBuilderCapability(input, moduleBuilders.get(input)); + } } })); diff --git a/pom.xml b/pom.xml index f588f3f17c..1c4bc4d4d5 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,8 @@ releasepom 0.2.0-SNAPSHOT pom - controller + controller +