<feature version='${mdsal.version}'>odl-mdsal-common</feature>
<feature version='${config.version}'>odl-config-startup</feature>
<feature version='${config.version}'>odl-config-netty</feature>
+ <bundle>mvn:com.lmax/disruptor/${lmax.version}</bundle>
<bundle>mvn:org.opendaylight.controller/sal-core-api/${project.version}</bundle>
<bundle>mvn:org.opendaylight.controller/sal-core-spi/${project.version}</bundle>
<bundle>mvn:org.opendaylight.controller/sal-broker-impl/${project.version}</bundle>
<bundle>mvn:org.opendaylight.controller/netconf-api/${project.version}</bundle>
<bundle>mvn:org.opendaylight.controller/netconf-auth/${project.version}</bundle>
<bundle>mvn:org.opendaylight.controller/ietf-netconf-monitoring/${project.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/ietf-netconf/${project.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/ietf-netconf-notifications/${project.version}</bundle>
<bundle>mvn:org.opendaylight.controller/ietf-netconf-monitoring-extension/${project.version}</bundle>
<bundle>mvn:org.opendaylight.yangtools.model/ietf-inet-types/${ietf-inet-types.version}</bundle>
<bundle>mvn:org.opendaylight.yangtools.model/ietf-yang-types/${ietf-yang-types.version}</bundle>
<feature version='${project.version}'>odl-config-netconf-connector</feature>
<!-- Netconf will not provide schemas without monitoring -->
<feature version='${project.version}'>odl-netconf-monitoring</feature>
+ <feature version='${project.version}'>odl-netconf-notifications-impl</feature>
<bundle>mvn:org.opendaylight.controller/netconf-impl/${project.version}</bundle>
</feature>
<feature name='odl-config-netconf-connector' version='${project.version}' description="OpenDaylight :: Netconf :: Connector">
<feature version='${project.version}'>odl-netconf-api</feature>
<feature version='${project.version}'>odl-netconf-mapping-api</feature>
<feature version='${project.version}'>odl-netconf-util</feature>
+ <feature version='${project.version}'>odl-netconf-notifications-api</feature>
<bundle>mvn:org.opendaylight.controller/config-netconf-connector/${project.version}</bundle>
</feature>
<feature name='odl-netconf-netty-util' version='${project.version}' description="OpenDaylight :: Netconf :: Netty Util">
<feature version='${project.version}'>odl-netconf-util</feature>
<bundle>mvn:org.opendaylight.controller/netconf-monitoring/${project.version}</bundle>
</feature>
+ <feature name='odl-netconf-notifications-api' version='${project.version}' description="OpenDaylight :: Netconf :: Notification :: Api">
+ <feature version='${project.version}'>odl-netconf-api</feature>
+ <bundle>mvn:org.opendaylight.controller/netconf-notifications-api/${project.version}</bundle>
+ </feature>
+ <feature name='odl-netconf-notifications-impl' version='${project.version}' description="OpenDaylight :: Netconf :: Monitoring :: Impl">
+ <feature version='${project.version}'>odl-netconf-notifications-api</feature>
+ <bundle>mvn:org.opendaylight.controller/netconf-notifications-impl/${project.version}</bundle>
+ </feature>
</features>
<dependency>
<groupId>org.ops4j.pax.exam</groupId>
<artifactId>pax-exam-container-karaf</artifactId>
- <version>${pax.exam.version}</version>
+ <version>${exam.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<dependency>
<groupId>org.ops4j.pax.exam</groupId>
<artifactId>pax-exam</artifactId>
- <version>${pax.exam.version}</version>
+ <version>${exam.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<artifactId>karaf-parent</artifactId>
<name>${project.artifactId}</name>
<packaging>pom</packaging>
- <prerequisites>
- <maven>3.1.1</maven>
- </prerequisites>
+
<properties>
<branding.version>1.1.0-SNAPSHOT</branding.version>
<karaf.resources.version>1.5.0-SNAPSHOT</karaf.resources.version>
</plugins>
</pluginManagement>
<plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>${enforcer.version}</version>
+ <executions>
+ <execution>
+ <id>enforce-maven</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <requireMavenVersion>
+ <version>3.1.1</version>
+ </requireMavenVersion>
+ </rules>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
</plugin>
</parent>
<artifactId>opendaylight-karaf-empty</artifactId>
<packaging>pom</packaging>
- <prerequisites>
- <maven>3.0</maven>
- </prerequisites>
<dependencies>
<dependency>
</parent>
<artifactId>distribution.opendaylight-karaf</artifactId>
<packaging>pom</packaging>
- <prerequisites>
- <maven>3.0</maven>
- </prerequisites>
<dependencies>
<dependency>
<northbound.jolokia.version>1.5.0-SNAPSHOT</northbound.jolokia.version>
<opendaylight-l2-types.version>2013.08.27.7-SNAPSHOT</opendaylight-l2-types.version>
<osgi-brandfragment.web.version>0.1.0-SNAPSHOT</osgi-brandfragment.web.version>
- <pax.exam.version>4.0.0</pax.exam.version>
<parboiled.version>1.1.6</parboiled.version>
<parboiled.scala.version>1.1.6</parboiled.scala.version>
<propertymavenplugin.version>1.0-alpha-2</propertymavenplugin.version>
<yangtools.version>0.7.0-SNAPSHOT</yangtools.version>
<sshd-core.version>0.12.0</sshd-core.version>
<jmh.version>0.9.7</jmh.version>
+ <lmax.version>3.3.0</lmax.version>
</properties>
<dependencyManagement>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.lmax</groupId>
+ <artifactId>disruptor</artifactId>
+ <version>${lmax.version}</version>
+ </dependency>
+
<!-- 3rd party dependencies needed by config-->
<dependency>
<groupId>com.jcabi</groupId>
<artifactId>config-plugin-parent</artifactId>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
<build>
<pluginManagement>
<artifactId>logback-config-loader</artifactId>
<packaging>bundle</packaging>
<name>${project.artifactId}</name>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
<dependencies>
<dependency>
<artifactId>logback-config</artifactId>
<packaging>bundle</packaging>
<name>${project.artifactId}</name>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
<dependencies>
<dependency>
<artifactId>netty-config-api</artifactId>
<packaging>bundle</packaging>
<name>${project.artifactId}</name>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
<dependencies>
<dependency>
<packaging>bundle</packaging>
<name>${project.artifactId}</name>
<description>Configuration Wrapper around netty's event executor</description>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
<dependencies>
<dependency>
<packaging>bundle</packaging>
<name>${project.artifactId}</name>
<description>Configuration Wrapper around netty's event group</description>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
<dependencies>
<dependency>
<packaging>bundle</packaging>
<name>${project.artifactId}</name>
<description>Configuration Wrapper around netty's timer</description>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
<dependencies>
<dependency>
<version>0.3.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
+
<modules>
<module>config-api</module>
<module>config-manager</module>
<artifactId>threadpool-config-api</artifactId>
<packaging>bundle</packaging>
<name>${project.artifactId}</name>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
<dependencies>
<dependency>
<artifactId>threadpool-config-impl</artifactId>
<packaging>bundle</packaging>
<name>${project.artifactId}</name>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
<dependencies>
<dependency>
assertThat(runtimeBeans.size(), is(4));
{
- RuntimeBeanEntry streamRB = findFirstByYangName(runtimeBeans,
- "stream");
+ RuntimeBeanEntry streamRB = findFirstByNamePrefix(runtimeBeans,
+ "ThreadStream");
assertNotNull(streamRB);
assertFalse(streamRB.getKeyYangName().isPresent());
assertFalse(streamRB.getKeyJavaName().isPresent());
+ " in " + runtimeBeans);
}
+ protected RuntimeBeanEntry findFirstByNamePrefix(final Collection<RuntimeBeanEntry> runtimeBeans, final String namePrefix) {
+ for (RuntimeBeanEntry rb : runtimeBeans) {
+ if (namePrefix.equals(rb.getJavaNamePrefix())) {
+ return rb;
+ }
+ }
+
+ throw new IllegalArgumentException("Name prefix not found:" + namePrefix
+ + " in " + runtimeBeans);
+ }
+
@Test
public void testGetWhenConditionMatcher() {
assertMatches("config",
assertThat(threadRB.getRpcs().size(), is(2));
}
{
- RuntimeBeanEntry streamRB = findFirstByYangName(runtimeBeans,
- "stream");
+ RuntimeBeanEntry streamRB = findFirstByNamePrefix(runtimeBeans,
+ "ThreadStream");
assertNotNull(streamRB);
assertFalse(streamRB.getKeyYangName().isPresent());
assertFalse(streamRB.getKeyJavaName().isPresent());
<name>${project.artifactId}</name>
<description>Remove generated source files, after new files generation, implementation is inserted.</description>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
<dependencies>
<dependency>
<name>${project.artifactId}</name>
<description>Artifact that contains only generated code from yang files. Suitable for testing.</description>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
<dependencies>
<dependency>
* This will stop the timeout clock
*/
void markFollowerInActive();
+
+
+ /**
+ * This will return the active time of follower, since it was last reset
+ * @return time in milliseconds
+ */
+ long timeSinceLastActivity();
+
}
stopwatch.stop();
}
}
+
+ @Override
+ public long timeSinceLastActivity() {
+ return stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ }
}
public class Snapshot implements Serializable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = -8298574936724056236L;
+
private final byte[] state;
private final List<ReplicatedLogEntry> unAppliedEntries;
private final long lastIndex;
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.raft.base.messages;
-
-import java.io.Serializable;
-
-/**
- * Message sent to commit an entry to the log
- */
-public class CommitEntry implements Serializable {
- private static final long serialVersionUID = 1L;
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.raft.base.messages;
-
-import java.io.Serializable;
-
-/**
- * Message sent to Persist an entry into the transaction journal
- */
-public class PersistEntry implements Serializable {
- private static final long serialVersionUID = 1L;
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.raft.base.messages;
-
-import java.io.Serializable;
-
-/**
- * This message is sent by a RaftActor to itself so that a subclass can process
- * it and use it to save it's state
- */
-public class SaveSnapshot implements Serializable {
- private static final long serialVersionUID = 1L;
-}
purgeInMemoryLog();
}
+ //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
+ sendUpdatesToFollower(followerId, followerLogInformation, false);
return this;
}
// set currentTerm = T, convert to follower (§5.1)
// This applies to all RPC messages and responses
if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+ LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {}", context.getId(),
+ rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
return switchBehavior(new Follower(context));
private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
String followerId = reply.getFollowerId();
FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+
+ if (followerToSnapshot == null) {
+ LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader",
+ context.getId(), followerId);
+ return;
+ }
+
FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
followerLogInformation.markFollowerActive();
- if (followerToSnapshot != null &&
- followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
-
+ if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
+ boolean wasLastChunk = false;
if (reply.isSuccess()) {
if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
//this was the last chunk reply
// we can remove snapshot from the memory
setSnapshot(Optional.<ByteString>absent());
}
+ wasLastChunk = true;
} else {
followerToSnapshot.markSendStatus(true);
followerToSnapshot.markSendStatus(false);
}
+ if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) {
+ ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ if(followerActor != null) {
+ sendSnapshotChunk(followerActor, followerId);
+ }
+ }
+
} else {
- LOG.error("{}: FollowerId in InstallSnapshotReply not known to Leader" +
- " or Chunk Index in InstallSnapshotReply not matching {} != {}",
- context.getId(), followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
- );
+ LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
+ context.getId(), reply.getChunkIndex(), followerId,
+ followerToSnapshot.getChunkIndex());
if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
// Since the Follower did not find this index to be valid we should reset the follower snapshot
private void sendAppendEntries() {
// Send an AppendEntries to all followers
+ long heartbeatInterval = context.getConfigParams().getHeartBeatInterval().toMillis();
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
final String followerId = e.getKey();
- ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ final FollowerLogInformation followerLogInformation = e.getValue();
+ // This checks helps not to send a repeat message to the follower
+ if(followerLogInformation.timeSinceLastActivity() >= heartbeatInterval) {
+ sendUpdatesToFollower(followerId, followerLogInformation, true);
+ }
+ }
+ }
- if (followerActor != null) {
- FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- long followerNextIndex = followerLogInformation.getNextIndex();
- boolean isFollowerActive = followerLogInformation.isFollowerActive();
-
- if (mapFollowerToSnapshot.get(followerId) != null) {
- // if install snapshot is in process , then sent next chunk if possible
- if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
- sendSnapshotChunk(followerActor, followerId);
- } else {
- // we send a heartbeat even if we have not received a reply for the last chunk
- sendAppendEntriesToFollower(followerActor, followerNextIndex,
- Collections.<ReplicatedLogEntry>emptyList());
- }
+ /**
+ *
+ * This method checks if any update needs to be sent to the given follower. This includes append log entries,
+ * sending next snapshot chunk, and initiating a snapshot.
+ * @return true if any update is sent, false otherwise
+ */
- } else {
- long leaderLastIndex = context.getReplicatedLog().lastIndex();
- long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
- final List<ReplicatedLogEntry> entries;
-
- if (isFollowerActive &&
- context.getReplicatedLog().isPresent(followerNextIndex)) {
- // FIXME : Sending one entry at a time
- entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
-
- } else if (isFollowerActive && followerNextIndex >= 0 &&
- leaderLastIndex >= followerNextIndex ) {
- // if the followers next index is not present in the leaders log, and
- // if the follower is just not starting and if leader's index is more than followers index
- // then snapshot should be sent
-
- if(LOG.isDebugEnabled()) {
- LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," +
- "follower-nextIndex: %s, leader-snapshot-index: %s, " +
- "leader-last-index: %s", context.getId(), followerId,
- followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
- }
- actor().tell(new InitiateInstallSnapshot(), actor());
-
- // we would want to sent AE as the capture snapshot might take time
- entries = Collections.<ReplicatedLogEntry>emptyList();
-
- } else {
- //we send an AppendEntries, even if the follower is inactive
- // in-order to update the followers timestamp, in case it becomes active again
- entries = Collections.<ReplicatedLogEntry>emptyList();
+ private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
+ boolean sendHeartbeat) {
+
+ ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ if (followerActor != null) {
+ long followerNextIndex = followerLogInformation.getNextIndex();
+ boolean isFollowerActive = followerLogInformation.isFollowerActive();
+
+ if (mapFollowerToSnapshot.get(followerId) != null) {
+ // if install snapshot is in process , then sent next chunk if possible
+ if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+ sendSnapshotChunk(followerActor, followerId);
+ } else if(sendHeartbeat) {
+ // we send a heartbeat even if we have not received a reply for the last chunk
+ sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
+ Collections.<ReplicatedLogEntry>emptyList(), followerId);
+ }
+ } else {
+ long leaderLastIndex = context.getReplicatedLog().lastIndex();
+ long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+ if (isFollowerActive &&
+ context.getReplicatedLog().isPresent(followerNextIndex)) {
+ // FIXME : Sending one entry at a time
+ final List<ReplicatedLogEntry> entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+
+ sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
+
+ } else if (isFollowerActive && followerNextIndex >= 0 &&
+ leaderLastIndex >= followerNextIndex) {
+ // if the followers next index is not present in the leaders log, and
+ // if the follower is just not starting and if leader's index is more than followers index
+ // then snapshot should be sent
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("InitiateInstallSnapshot to follower:{}," +
+ "follower-nextIndex:{}, leader-snapshot-index:{}, " +
+ "leader-last-index:{}", followerId,
+ followerNextIndex, leaderSnapShotIndex, leaderLastIndex
+ );
}
+ actor().tell(new InitiateInstallSnapshot(), actor());
- sendAppendEntriesToFollower(followerActor, followerNextIndex, entries);
+ // Send heartbeat to follower whenever install snapshot is initiated.
+ sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
+ Collections.<ReplicatedLogEntry>emptyList(), followerId);
+ } else if(sendHeartbeat) {
+ //we send an AppendEntries, even if the follower is inactive
+ // in-order to update the followers timestamp, in case it becomes active again
+ sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
+ Collections.<ReplicatedLogEntry>emptyList(), followerId);
}
+
}
}
}
private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
- List<ReplicatedLogEntry> entries) {
- followerActor.tell(
- new AppendEntries(currentTerm(), context.getId(),
- prevLogIndex(followerNextIndex),
- prevLogTerm(followerNextIndex), entries,
- context.getCommitIndex(),
- replicatedToAllIndex).toSerializable(),
- actor()
- );
+ List<ReplicatedLogEntry> entries, String followerId) {
+ AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
+ prevLogIndex(followerNextIndex),
+ prevLogTerm(followerNextIndex), entries,
+ context.getCommitIndex(), replicatedToAllIndex);
+
+ if(!entries.isEmpty()) {
+ LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId,
+ appendEntries);
+ }
+
+ followerActor.tell(appendEntries.toSerializable(), actor());
}
/**
*
*/
private void installSnapshotIfNeeded() {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: installSnapshotIfNeeded, followers {}", context.getId(), followerToLog.keySet());
+ }
+
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
final ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
long nextIndex = e.getValue().getNextIndex();
if (!context.getReplicatedLog().isPresent(nextIndex) &&
- context.getReplicatedLog().isInSnapshot(nextIndex)) {
+ context.getReplicatedLog().isInSnapshot(nextIndex)) {
LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey());
if (snapshot.isPresent()) {
// if a snapshot is present in the memory, most likely another install is in progress
// no need to capture snapshot
sendSnapshotChunk(followerActor, e.getKey());
- } else {
+ } else if (!context.isSnapshotCaptureInitiated()) {
initiateCaptureSnapshot();
//we just need 1 follower who would need snapshot to be installed.
// when we have the snapshot captured, we would again check (in SendInstallSnapshot)
actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
actor());
+ context.setSnapshotCaptureInitiated(true);
}
private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
try {
if (snapshot.isPresent()) {
+ ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get());
+
+ // Note: the previous call to getNextSnapshotChunk has the side-effect of adding
+ // followerId to the followerToSnapshot map.
+ FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+
followerActor.tell(
new InstallSnapshot(currentTerm(), context.getId(),
context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().getSnapshotTerm(),
- getNextSnapshotChunk(followerId,snapshot.get()),
- mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks(),
- Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode())
+ nextSnapshotChunk,
+ followerToSnapshot.incrementChunkIndex(),
+ followerToSnapshot.getTotalChunks(),
+ Optional.of(followerToSnapshot.getLastChunkHashCode())
).toSerializable(),
actor()
);
LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
context.getId(), followerActor.path(),
- mapFollowerToSnapshot.get(followerId).getChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks());
+ followerToSnapshot.getChunkIndex(),
+ followerToSnapshot.getTotalChunks());
}
} catch (IOException e) {
LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId());
* Reply for the AppendEntriesRpc message
*/
public class AppendEntriesReply extends AbstractRaftRPC {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = -7487547356392536683L;
// true if follower contained entry matching
// prevLogIndex and prevLogTerm
this.logLastTerm = logLastTerm;
}
+ @Override
public long getTerm() {
return term;
}
package org.opendaylight.controller.cluster.raft.messages;
public class InstallSnapshotReply extends AbstractRaftRPC {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 642227896390779503L;
// The followerId - this will be used to figure out which follower is
// responding
private final String followerId;
private final int chunkIndex;
- private boolean success;
+ private final boolean success;
public InstallSnapshotReply(long term, String followerId, int chunkIndex,
boolean success) {
* Invoked by candidates to gather votes (§5.2).
*/
public class RequestVote extends AbstractRaftRPC {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = -6967509186297108657L;
// candidate requesting vote
private String candidateId;
public RequestVote() {
}
+ @Override
public long getTerm() {
return term;
}
package org.opendaylight.controller.cluster.raft.messages;
public class RequestVoteReply extends AbstractRaftRPC {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 8427899326488775660L;
// true means candidate received vot
private final boolean voteGranted;
this.voteGranted = voteGranted;
}
+ @Override
public long getTerm() {
return term;
}
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
};
}
+ @Test
+ public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ String persistenceId = "leader1";
+
+ ActorRef followerActor1 =
+ getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ ActorRef followerActor2 =
+ getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-1", followerActor1.path().toString());
+ peerAddresses.put("follower-2", followerActor2.path().toString());
+
+ TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
+ MockRaftActor.props(persistenceId, peerAddresses,
+ Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+ MockRaftActor leaderActor = mockActorRef.underlyingActor();
+ leaderActor.getRaftActorContext().setCommitIndex(9);
+ leaderActor.getRaftActorContext().setLastApplied(9);
+ leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+
+ leaderActor.waitForInitializeBehaviorComplete();
+
+ Leader leader = new Leader(leaderActor.getRaftActorContext());
+ leaderActor.setCurrentBehavior(leader);
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ // create 5 entries in the log
+ MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
+ leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
+ //set the snapshot index to 4 , 0 to 4 are snapshotted
+ leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
+ assertEquals(5, leaderActor.getReplicatedLog().size());
+
+ leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 9, 1));
+ assertEquals(5, leaderActor.getReplicatedLog().size());
+
+ // set the 2nd follower nextIndex to 1 which has been snapshotted
+ leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 0, 1));
+ assertEquals(5, leaderActor.getReplicatedLog().size());
+
+ // simulate a real snapshot
+ leaderActor.onReceiveCommand(new InitiateInstallSnapshot());
+ assertEquals(5, leaderActor.getReplicatedLog().size());
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ //reply from a slow follower does not initiate a fake snapshot
+ leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 9, 1));
+ assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
+
+ ByteString snapshotBytes = fromObject(Arrays.asList(
+ new MockRaftActorContext.MockPayload("foo-0"),
+ new MockRaftActorContext.MockPayload("foo-1"),
+ new MockRaftActorContext.MockPayload("foo-2"),
+ new MockRaftActorContext.MockPayload("foo-3"),
+ new MockRaftActorContext.MockPayload("foo-4")));
+ leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
+ assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+
+ assertEquals("Real snapshot didn't clear the log till lastApplied", 0, leaderActor.getReplicatedLog().size());
+
+ //reply from a slow follower after should not raise errors
+ leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 5, 1));
+ assertEquals(0, leaderActor.getReplicatedLog().size());
+
+ mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+ }
+ };
+ }
+
+
+
private ByteString fromObject(Object snapshot) throws Exception {
ByteArrayOutputStream b = null;
ObjectOutputStream o = null;
package org.opendaylight.controller.cluster.raft.behaviors;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
import scala.concurrent.duration.FiniteDuration;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
public class LeaderTest extends AbstractRaftActorBehaviorTest {
private final ActorRef leaderActor =
actorContext.setPeerAddresses(peerAddresses);
Leader leader = new Leader(actorContext);
+ leader.markFollowerActive(followerActor.path().toString());
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
leader.handleMessage(senderActor, new SendHeartBeat());
final String out =
actorContext.setPeerAddresses(peerAddresses);
Leader leader = new Leader(actorContext);
+ leader.markFollowerActive(followerActor.path().toString());
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
RaftActorBehavior raftBehavior = leader
.handleMessage(senderActor, new Replicate(null, null,
new MockRaftActorContext.MockReplicatedLogEntry(1,
leader.getFollowerToSnapshot().getNextChunk();
leader.getFollowerToSnapshot().incrementChunkIndex();
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
leader.handleMessage(leaderActor, new SendHeartBeat());
AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
//update follower timestamp
leader.markFollowerActive(followerActor.path().toString());
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
// this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
RaftActorBehavior raftBehavior = leader.handleMessage(
senderActor, new Replicate(null, "state-id", entry));
assertEquals(1, cs.getLastAppliedTerm());
assertEquals(4, cs.getLastIndex());
assertEquals(2, cs.getLastTerm());
+
+ // if an initiate is started again when first is in progress, it shouldnt initiate Capture
+ raftBehavior = leader.handleMessage(leaderActor, new InitiateInstallSnapshot());
+ List<Object> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
+ assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size());
+
}};
}
assertEquals(snapshotIndex + 1, fli.getNextIndex());
}};
}
+ @Test
+ public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ TestActorRef<MessageCollectorActor> followerActor =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply");
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-reply",
+ followerActor.path().toString());
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ };
+ configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+ configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+ actorContext.setConfigParams(configParams);
+ actorContext.setPeerAddresses(peerAddresses);
+ actorContext.setCommitIndex(followersLastIndex);
+
+ MockLeader leader = new MockLeader(actorContext);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
+
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+ List<Object> objectList = MessageCollectorActor.getAllMatching(followerActor,
+ InstallSnapshotMessages.InstallSnapshot.class);
+
+ assertEquals(1, objectList.size());
+
+ Object o = objectList.get(0);
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ "follower-reply", installSnapshot.getChunkIndex(), true));
+
+ objectList = MessageCollectorActor.getAllMatching(followerActor,
+ InstallSnapshotMessages.InstallSnapshot.class);
+
+ assertEquals(2, objectList.size());
+
+ installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1);
+
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ "follower-reply", installSnapshot.getChunkIndex(), true));
+
+ objectList = MessageCollectorActor.getAllMatching(followerActor,
+ InstallSnapshotMessages.InstallSnapshot.class);
+
+ assertEquals(3, objectList.size());
+
+ installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2);
+
+ // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ "follower-reply", installSnapshot.getChunkIndex(), true));
+
+ objectList = MessageCollectorActor.getAllMatching(followerActor,
+ InstallSnapshotMessages.InstallSnapshot.class);
+
+ // Count should still stay at 3
+ assertEquals(3, objectList.size());
+ }};
+ }
+
@Test
- public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
+ public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
new JavaTestKit(getSystem()) {{
TestActorRef<MessageCollectorActor> followerActor =
assertEquals(3, installSnapshot.getTotalChunks());
- leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ followerActor.path().toString(), -1, false));
+
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
leader.handleMessage(leaderActor, new SendHeartBeat());
- o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+ o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1);
assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
{
TestActorRef<MessageCollectorActor> followerActor =
- TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(followerActor.path().toString(),
leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
- leader.handleMessage(leaderActor, new SendHeartBeat());
-
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
o = MessageCollectorActor.getAllMessages(followerActor).get(1);
assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
Leader leader = new Leader(leaderActorContext);
leader.markFollowerActive(followerActor.path().toString());
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
leader.handleMessage(leaderActor, new SendHeartBeat());
AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
Leader leader = new Leader(leaderActorContext);
leader.markFollowerActive(followerActor.path().toString());
+ Thread.sleep(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis());
+
leader.handleMessage(leaderActor, new SendHeartBeat());
AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
}};
}
+
+ @Test
+ public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+ configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+ leaderActorContext.setConfigParams(configParams);
+
+ ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+
+ MockRaftActorContext followerActorContext =
+ new MockRaftActorContext("follower-reply", getSystem(), followerActor);
+
+ followerActorContext.setConfigParams(configParams);
+
+ Follower follower = new Follower(followerActorContext);
+
+ ForwardMessageToBehaviorActor.setBehavior(follower);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-reply",
+ followerActor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ leaderActorContext.getReplicatedLog().removeFrom(0);
+
+ //create 3 entries
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ leaderActorContext.setCommitIndex(1);
+
+ Leader leader = new Leader(leaderActorContext);
+ leader.markFollowerActive("follower-reply");
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ AppendEntries appendEntries = (AppendEntries) ForwardMessageToBehaviorActor
+ .getFirstMatching(followerActor, AppendEntries.class);
+
+ assertNotNull(appendEntries);
+
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals(0, appendEntries.getPrevLogIndex());
+
+ AppendEntriesReply appendEntriesReply =
+ (AppendEntriesReply)ForwardMessageToBehaviorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ assertNotNull(appendEntriesReply);
+
+ leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
+
+ List<Object> entries = ForwardMessageToBehaviorActor
+ .getAllMatching(followerActor, AppendEntries.class);
+
+ assertEquals("AppendEntries count should be 2 ", 2, entries.size());
+
+ AppendEntries appendEntriesSecond = (AppendEntries) entries.get(1);
+
+ assertEquals(1, appendEntriesSecond.getLeaderCommit());
+ assertEquals(2, appendEntriesSecond.getEntries().get(0).getIndex());
+ assertEquals(1, appendEntriesSecond.getPrevLogIndex());
+
+ }};
+ }
+
class MockLeader extends Leader {
FollowerToSnapshot fts;
import static org.ops4j.pax.exam.CoreOptions.frameworkProperty;
import static org.ops4j.pax.exam.CoreOptions.junitBundles;
import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
+import static org.ops4j.pax.exam.CoreOptions.systemPackages;
import static org.ops4j.pax.exam.CoreOptions.systemProperty;
-
import org.ops4j.pax.exam.Option;
import org.ops4j.pax.exam.options.DefaultCompositeOption;
import org.ops4j.pax.exam.util.PathUtils;
bindingAwareSalBundles(),
mavenBundle("commons-codec", "commons-codec").versionAsInProject(),
- systemProperty("org.osgi.framework.system.packages.extra").value("sun.nio.ch"),
+ systemPackages("sun.nio.ch", "sun.misc"),
mavenBundle("io.netty", "netty-common").versionAsInProject(), //
mavenBundle("io.netty", "netty-buffer").versionAsInProject(), //
mavenBundle("io.netty", "netty-handler").versionAsInProject(), //
mavenBundle(CONTROLLER, "sal-common-util").versionAsInProject(), // //
- mavenBundle(CONTROLLER, "sal-inmemory-datastore").versionAsInProject(), // /
+ mavenBundle("com.lmax", "disruptor").versionAsInProject(),
+ mavenBundle(CONTROLLER, "sal-inmemory-datastore").versionAsInProject(), //
mavenBundle(CONTROLLER, "sal-broker-impl").versionAsInProject(), // //
mavenBundle(CONTROLLER, "sal-core-spi").versionAsInProject().update(), //
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import java.io.IOException;
+
+/**
+ * Exception thrown from NormalizedNodeInputStreamReader when the input stream does not contain
+ * valid serialized data.
+ *
+ * @author Thomas Pantelis
+ */
+public class InvalidNormalizedNodeStreamException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ public InvalidNormalizedNodeStreamException(String message) {
+ super(message);
+ }
+}
private final StringBuilder reusableStringBuilder = new StringBuilder(50);
+ private boolean readSignatureMarker = true;
+
public NormalizedNodeInputStreamReader(InputStream stream) throws IOException {
Preconditions.checkNotNull(stream);
input = new DataInputStream(stream);
@Override
public NormalizedNode<?, ?> readNormalizedNode() throws IOException {
+ readSignatureMarkerAndVersionIfNeeded();
+ return readNormalizedNodeInternal();
+ }
+
+ private void readSignatureMarkerAndVersionIfNeeded() throws IOException {
+ if(readSignatureMarker) {
+ readSignatureMarker = false;
+
+ byte marker = input.readByte();
+ if(marker != NormalizedNodeOutputStreamWriter.SIGNATURE_MARKER) {
+ throw new InvalidNormalizedNodeStreamException(String.format(
+ "Invalid signature marker: %d", marker));
+ }
+
+ input.readShort(); // read the version - not currently used/needed.
+ }
+ }
+
+ private NormalizedNode<?, ?> readNormalizedNodeInternal() throws IOException {
// each node should start with a byte
byte nodeType = input.readByte();
return bytes;
case ValueTypes.YANG_IDENTIFIER_TYPE :
- return readYangInstanceIdentifier();
+ return readYangInstanceIdentifierInternal();
default :
return null;
}
public YangInstanceIdentifier readYangInstanceIdentifier() throws IOException {
+ readSignatureMarkerAndVersionIfNeeded();
+ return readYangInstanceIdentifierInternal();
+ }
+
+ private YangInstanceIdentifier readYangInstanceIdentifierInternal() throws IOException {
int size = input.readInt();
List<PathArgument> pathArguments = new ArrayList<>(size);
lastLeafSetQName = nodeType;
- LeafSetEntryNode<Object> child = (LeafSetEntryNode<Object>)readNormalizedNode();
+ LeafSetEntryNode<Object> child = (LeafSetEntryNode<Object>)readNormalizedNodeInternal();
while(child != null) {
builder.withChild(child);
- child = (LeafSetEntryNode<Object>)readNormalizedNode();
+ child = (LeafSetEntryNode<Object>)readNormalizedNodeInternal();
}
return builder;
}
NormalizedNodeContainerBuilder builder) throws IOException {
LOG.debug("Reading data container (leaf nodes) nodes");
- NormalizedNode<?, ?> child = readNormalizedNode();
+ NormalizedNode<?, ?> child = readNormalizedNodeInternal();
while(child != null) {
builder.addChild(child);
- child = readNormalizedNode();
+ child = readNormalizedNodeInternal();
}
return builder;
}
private static final Logger LOG = LoggerFactory.getLogger(NormalizedNodeOutputStreamWriter.class);
+ static final byte SIGNATURE_MARKER = (byte) 0xab;
+ static final short CURRENT_VERSION = (short) 1;
+
static final byte IS_CODE_VALUE = 1;
static final byte IS_STRING_VALUE = 2;
static final byte IS_NULL_VALUE = 3;
private NormalizedNodeWriter normalizedNodeWriter;
+ private boolean wroteSignatureMarker;
+
public NormalizedNodeOutputStreamWriter(OutputStream stream) throws IOException {
Preconditions.checkNotNull(stream);
output = new DataOutputStream(stream);
}
public void writeNormalizedNode(NormalizedNode<?, ?> node) throws IOException {
+ writeSignatureMarkerAndVersionIfNeeded();
normalizedNodeWriter().write(node);
}
+ private void writeSignatureMarkerAndVersionIfNeeded() throws IOException {
+ if(!wroteSignatureMarker) {
+ output.writeByte(SIGNATURE_MARKER);
+ output.writeShort(CURRENT_VERSION);
+ wroteSignatureMarker = true;
+ }
+ }
+
@Override
public void leafNode(YangInstanceIdentifier.NodeIdentifier name, Object value) throws IOException, IllegalArgumentException {
Preconditions.checkNotNull(name, "Node identifier should not be null");
private void startNode(final QName qName, byte nodeType) throws IOException {
Preconditions.checkNotNull(qName, "QName of node identifier should not be null.");
+
+ writeSignatureMarkerAndVersionIfNeeded();
+
// First write the type of node
output.writeByte(nodeType);
// Write Start Tag
}
public void writeYangInstanceIdentifier(YangInstanceIdentifier identifier) throws IOException {
+ writeSignatureMarkerAndVersionIfNeeded();
+ writeYangInstanceIdentifierInternal(identifier);
+ }
+
+ private void writeYangInstanceIdentifierInternal(YangInstanceIdentifier identifier) throws IOException {
Iterable<YangInstanceIdentifier.PathArgument> pathArguments = identifier.getPathArguments();
int size = Iterables.size(pathArguments);
output.writeInt(size);
output.write(bytes);
break;
case ValueTypes.YANG_IDENTIFIER_TYPE:
- writeYangInstanceIdentifier((YangInstanceIdentifier) value);
+ writeYangInstanceIdentifierInternal((YangInstanceIdentifier) value);
break;
case ValueTypes.NULL_TYPE :
break;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Assert;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
import org.opendaylight.controller.cluster.datastore.util.TestModel;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
-import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetEntryNodeBuilder;
public class NormalizedNodeStreamReaderWriterTest {
@Test
- public void testNormalizedNodeStreamReaderWriter() throws IOException {
+ public void testNormalizedNodeStreaming() throws IOException {
- testNormalizedNodeStreamReaderWriter(createTestContainer());
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ NormalizedNodeOutputStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream);
+
+ NormalizedNode<?, ?> testContainer = createTestContainer();
+ writer.writeNormalizedNode(testContainer);
QName toaster = QName.create("http://netconfcentral.org/ns/toaster","2009-11-20","toaster");
QName darknessFactor = QName.create("http://netconfcentral.org/ns/toaster","2009-11-20","darknessFactor");
withNodeIdentifier(new NodeIdentifier(toaster)).
withChild(ImmutableNodes.leafNode(darknessFactor, "1000")).build();
- testNormalizedNodeStreamReaderWriter(Builders.containerBuilder().
+ ContainerNode toasterContainer = Builders.containerBuilder().
withNodeIdentifier(new NodeIdentifier(SchemaContext.NAME)).
- withChild(toasterNode).build());
+ withChild(toasterNode).build();
+ writer.writeNormalizedNode(toasterContainer);
+
+ NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+ new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
+
+ NormalizedNode<?,?> node = reader.readNormalizedNode();
+ Assert.assertEquals(testContainer, node);
+
+ node = reader.readNormalizedNode();
+ Assert.assertEquals(toasterContainer, node);
+
+ writer.close();
}
private NormalizedNode<?, ?> createTestContainer() {
build();
}
- private void testNormalizedNodeStreamReaderWriter(NormalizedNode<?, ?> input) throws IOException {
+ @Test
+ public void testYangInstanceIdentifierStreaming() throws IOException {
+ YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.TEST_PATH).
+ node(TestModel.OUTER_LIST_QNAME).nodeWithKey(
+ TestModel.INNER_LIST_QNAME, TestModel.ID_QNAME, 10).build();
- byte[] byteData = null;
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ NormalizedNodeOutputStreamWriter writer =
+ new NormalizedNodeOutputStreamWriter(byteArrayOutputStream);
+ writer.writeYangInstanceIdentifier(path);
+
+ NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+ new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
+
+ YangInstanceIdentifier newPath = reader.readYangInstanceIdentifier();
+ Assert.assertEquals(path, newPath);
+
+ writer.close();
+ }
+
+ @Test
+ public void testNormalizedNodeAndYangInstanceIdentifierStreaming() throws IOException {
- try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- NormalizedNodeStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream)) {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ NormalizedNodeOutputStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream);
- NormalizedNodeWriter normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(writer);
- normalizedNodeWriter.write(input);
- byteData = byteArrayOutputStream.toByteArray();
+ NormalizedNode<?, ?> testContainer = TestModel.createBaseTestContainerBuilder().build();
+ writer.writeNormalizedNode(testContainer);
- }
+ YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.TEST_PATH).
+ node(TestModel.OUTER_LIST_QNAME).nodeWithKey(
+ TestModel.INNER_LIST_QNAME, TestModel.ID_QNAME, 10).build();
+
+ writer.writeYangInstanceIdentifier(path);
NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
- new ByteArrayInputStream(byteData));
+ new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
NormalizedNode<?,?> node = reader.readNormalizedNode();
- Assert.assertEquals(input, node);
+ Assert.assertEquals(testContainer, node);
+
+ YangInstanceIdentifier newPath = reader.readYangInstanceIdentifier();
+ Assert.assertEquals(path, newPath);
+
+ writer.close();
+ }
+
+ @Test(expected=InvalidNormalizedNodeStreamException.class, timeout=10000)
+ public void testInvalidNormalizedNodeStream() throws IOException {
+ byte[] protobufBytes = new NormalizedNodeToNodeCodec(null).encode(
+ TestModel.createBaseTestContainerBuilder().build()).getNormalizedNode().toByteArray();
+
+ NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+ new ByteArrayInputStream(protobufBytes));
+
+ reader.readNormalizedNode();
+ }
+
+ @Test(expected=InvalidNormalizedNodeStreamException.class, timeout=10000)
+ public void testInvalidYangInstanceIdentifierStream() throws IOException {
+ YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.TEST_PATH).build();
+
+ byte[] protobufBytes = ShardTransactionMessages.DeleteData.newBuilder().setInstanceIdentifierPathArguments(
+ InstanceIdentifierUtils.toSerializable(path)).build().toByteArray();
+
+ NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+ new ByteArrayInputStream(protobufBytes));
+
+ reader.readYangInstanceIdentifier();
}
@Test
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.Duration;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
/**
* The ShardManager has the following jobs,
}
static class SchemaContextModules implements Serializable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = -8884620101025936590L;
+
private final Set<String> modules;
SchemaContextModules(Set<String> modules){
protected DOMStoreTransaction getDOMStoreTransaction() {
return transaction;
}
+
+ @Override
+ protected boolean returnCloseTransactionReply() {
+ return false;
+ }
}
* The ShardTransaction Actor delegates all actions to DOMDataReadWriteTransaction
* </p>
* <p>
- * Even though the DOMStore and the DOMStoreTransactionChain implement multiple types of transactions
- * the ShardTransaction Actor only works with read-write transactions. This is just to keep the logic simple. At this
- * time there are no known advantages for creating a read-only or write-only transaction which may change over time
- * at which point we can optimize things in the distributed store as well.
- * </p>
- * <p>
* Handles Messages <br/>
* ---------------- <br/>
* <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadData}
}
}
+ protected boolean returnCloseTransactionReply() {
+ return true;
+ }
+
private void closeTransaction(boolean sendReply) {
getDOMStoreTransaction().close();
- if(sendReply) {
+ if(sendReply && returnCloseTransactionReply()) {
getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf());
}
import java.io.DataOutputStream;
import java.io.IOException;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.InvalidNormalizedNodeStreamException;
import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader;
import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter;
import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
}
public static NormalizedNode<?, ?> deserializeNormalizedNode(DataInput in) {
- try {
- boolean present = in.readBoolean();
- if(present) {
- NormalizedNodeInputStreamReader streamReader = streamReader(in);
- return streamReader.readNormalizedNode();
- }
- } catch (IOException e) {
- throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
- }
+ try {
+ return tryDeserializeNormalizedNode(in);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
+ }
+ }
+
+ private static NormalizedNode<?, ?> tryDeserializeNormalizedNode(DataInput in) throws IOException {
+ boolean present = in.readBoolean();
+ if(present) {
+ NormalizedNodeInputStreamReader streamReader = streamReader(in);
+ return streamReader.readNormalizedNode();
+ }
return null;
}
public static NormalizedNode<?, ?> deserializeNormalizedNode(byte [] bytes) {
NormalizedNode<?, ?> node = null;
try {
- node = deserializeNormalizedNode(new DataInputStream(new ByteArrayInputStream(bytes)));
- } catch(Exception e) {
- }
-
- if(node == null) {
- // Must be from legacy protobuf serialization - try that.
+ node = tryDeserializeNormalizedNode(new DataInputStream(new ByteArrayInputStream(bytes)));
+ } catch(InvalidNormalizedNodeStreamException e) {
+ // Probably from legacy protobuf serialization - try that.
try {
NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(bytes);
node = new NormalizedNodeToNodeCodec(null).decode(serializedNode);
- } catch (InvalidProtocolBufferException e) {
+ } catch (InvalidProtocolBufferException e2) {
throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
}
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
}
return node;
}
@Test
- public void testOnReceiveCloseTransaction() throws Exception {
+ public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
- "testCloseTransaction");
+ "testReadWriteTxOnReceiveCloseTransaction");
watch(transaction);
}};
}
+ @Test
+ public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+ "testWriteTxOnReceiveCloseTransaction");
+
+ watch(transaction);
+
+ transaction.tell(new CloseTransaction().toSerializable(), getRef());
+
+ expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
+ expectTerminated(duration("3 seconds"), transaction);
+ }};
+ }
+
+ @Test
+ public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
+ "testReadOnlyTxOnReceiveCloseTransaction");
+
+ watch(transaction);
+
+ transaction.tell(new CloseTransaction().toSerializable(), getRef());
+
+ expectMsgClass(duration("3 seconds"), Terminated.class);
+ }};
+ }
+
@Test(expected=UnknownMessageException.class)
public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
final ActorRef shard = createShard();
<artifactId>guava</artifactId>
</dependency>
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
+ <groupId>com.lmax</groupId>
+ <artifactId>disruptor</artifactId>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>ietf-yang-types</artifactId>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.ImmutableMultimap.Builder;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.InsufficientCapacityException;
+import com.lmax.disruptor.SleepingWaitStrategy;
+import com.lmax.disruptor.WaitStrategy;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+/**
+ * Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides
+ * routing of notifications from publishers to subscribers.
+ *
+ * Internal implementation works by allocating a two-handler Disruptor. The first handler delivers notifications
+ * to subscribed listeners and the second one notifies whoever may be listening on the returned future. Registration
+ * state tracking is performed by a simple immutable multimap -- when a registration or unregistration occurs we
+ * re-generate the entire map from scratch and set it atomically. While registrations/unregistrations synchronize
+ * on this instance, notifications do not take any locks here.
+ *
+ * The fully-blocking {@link #publish(long, DOMNotification, Collection)} and non-blocking {@link #offerNotification(DOMNotification)}
+ * are realized using the Disruptor's native operations. The bounded-blocking {@link #offerNotification(DOMNotification, long, TimeUnit)}
+ * is realized by arming a background wakeup interrupt.
+ */
+public final class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService, DOMNotificationService {
+ private static final ListenableFuture<Void> NO_LISTENERS = Futures.immediateFuture(null);
+ private static final WaitStrategy DEFAULT_STRATEGY = new SleepingWaitStrategy();
+ private static final EventHandler<DOMNotificationRouterEvent> DISPATCH_NOTIFICATIONS = new EventHandler<DOMNotificationRouterEvent>() {
+ @Override
+ public void onEvent(final DOMNotificationRouterEvent event, final long sequence, final boolean endOfBatch) throws Exception {
+ event.deliverNotification();
+
+ }
+ };
+ private static final EventHandler<DOMNotificationRouterEvent> NOTIFY_FUTURE = new EventHandler<DOMNotificationRouterEvent>() {
+ @Override
+ public void onEvent(final DOMNotificationRouterEvent event, final long sequence, final boolean endOfBatch) {
+ event.setFuture();
+ }
+ };
+
+ private final Disruptor<DOMNotificationRouterEvent> disruptor;
+ private final ExecutorService executor;
+ private volatile Multimap<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>> listeners = ImmutableMultimap.of();
+
+ private DOMNotificationRouter(final ExecutorService executor, final Disruptor<DOMNotificationRouterEvent> disruptor) {
+ this.executor = Preconditions.checkNotNull(executor);
+ this.disruptor = Preconditions.checkNotNull(disruptor);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static DOMNotificationRouter create(final int queueDepth) {
+ final ExecutorService executor = Executors.newCachedThreadPool();
+ final Disruptor<DOMNotificationRouterEvent> disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY, queueDepth, executor, ProducerType.MULTI, DEFAULT_STRATEGY);
+
+ disruptor.after(DISPATCH_NOTIFICATIONS).handleEventsWith(NOTIFY_FUTURE);
+ disruptor.start();
+
+ return new DOMNotificationRouter(executor, disruptor);
+ }
+
+ @Override
+ public synchronized <T extends DOMNotificationListener> ListenerRegistration<T> registerNotificationListener(final T listener, final Collection<SchemaPath> types) {
+ final ListenerRegistration<T> reg = new AbstractListenerRegistration<T>(listener) {
+ @Override
+ protected void removeRegistration() {
+ final ListenerRegistration<T> me = this;
+
+ synchronized (DOMNotificationRouter.this) {
+ listeners = ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, new Predicate<ListenerRegistration<? extends DOMNotificationListener>>() {
+ @Override
+ public boolean apply(final ListenerRegistration<? extends DOMNotificationListener> input) {
+ return input != me;
+ }
+ }));
+ }
+ }
+ };
+
+ if (!types.isEmpty()) {
+ final Builder<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>> b = ImmutableMultimap.builder();
+ b.putAll(listeners);
+
+ for (SchemaPath t : types) {
+ b.put(t, reg);
+ }
+
+ listeners = b.build();
+ }
+
+ return reg;
+ }
+
+ @Override
+ public <T extends DOMNotificationListener> ListenerRegistration<T> registerNotificationListener(final T listener, final SchemaPath... types) {
+ return registerNotificationListener(listener, Arrays.asList(types));
+ }
+
+ private ListenableFuture<Void> publish(final long seq, final DOMNotification notification, final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
+ final DOMNotificationRouterEvent event = disruptor.get(seq);
+ final ListenableFuture<Void> future = event.initialize(notification, subscribers);
+ disruptor.getRingBuffer().publish(seq);
+ return future;
+ }
+
+ @Override
+ public ListenableFuture<? extends Object> putNotification(final DOMNotification notification) throws InterruptedException {
+ final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers = listeners.get(notification.getType());
+ if (subscribers.isEmpty()) {
+ return NO_LISTENERS;
+ }
+
+ final long seq = disruptor.getRingBuffer().next();
+ return publish(seq, notification, subscribers);
+ }
+
+ private ListenableFuture<? extends Object> tryPublish(final DOMNotification notification, final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
+ final long seq;
+ try {
+ seq = disruptor.getRingBuffer().tryNext();
+ } catch (InsufficientCapacityException e) {
+ return DOMNotificationPublishService.REJECTED;
+ }
+
+ return publish(seq, notification, subscribers);
+ }
+
+ @Override
+ public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification) {
+ final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers = listeners.get(notification.getType());
+ if (subscribers.isEmpty()) {
+ return NO_LISTENERS;
+ }
+
+ return tryPublish(notification, subscribers);
+ }
+
+ @Override
+ public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification, final long timeout,
+ final TimeUnit unit) throws InterruptedException {
+ final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers = listeners.get(notification.getType());
+ if (subscribers.isEmpty()) {
+ return NO_LISTENERS;
+ }
+
+ // Attempt to perform a non-blocking publish first
+ final ListenableFuture<? extends Object> noBlock = tryPublish(notification, subscribers);
+ if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) {
+ return noBlock;
+ }
+
+ /*
+ * FIXME: we need a background thread, which will watch out for blocking too long. Here
+ * we will arm a tasklet for it and synchronize delivery of interrupt properly.
+ */
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ @Override
+ public void close() {
+ disruptor.shutdown();
+ executor.shutdown();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.lmax.disruptor.EventFactory;
+import java.util.Collection;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+/**
+ * A single notification event in the disruptor ringbuffer. These objects are reused,
+ * so they do have mutable state.
+ */
+final class DOMNotificationRouterEvent {
+ public static final EventFactory<DOMNotificationRouterEvent> FACTORY = new EventFactory<DOMNotificationRouterEvent>() {
+ @Override
+ public DOMNotificationRouterEvent newInstance() {
+ return new DOMNotificationRouterEvent();
+ }
+ };
+
+ private Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers;
+ private DOMNotification notification;
+ private SettableFuture<Void> future;
+
+ private DOMNotificationRouterEvent() {
+ // Hidden on purpose, initialized in initialize()
+ }
+
+ ListenableFuture<Void> initialize(final DOMNotification notification, final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
+ this.notification = Preconditions.checkNotNull(notification);
+ this.subscribers = Preconditions.checkNotNull(subscribers);
+ this.future = SettableFuture.create();
+ return this.future;
+ }
+
+ void deliverNotification() {
+ for (ListenerRegistration<? extends DOMNotificationListener> r : subscribers) {
+ final DOMNotificationListener l = r.getInstance();
+ if (l != null) {
+ l.onNotification(notification);
+ }
+ }
+ }
+
+ void setFuture() {
+ future.set(null);
+ }
+
+}
\ No newline at end of file
*/
final boolean success = READY_UPDATER.compareAndSet(this, null, tx);
Preconditions.checkState(success, "Transaction %s collided on ready state", tx, readyTx);
- LOG.debug("Transaction {} readied");
+ LOG.debug("Transaction {} readied", tx);
/*
* We do not see a transaction being in-flight, so we need to take care of dispatching
<version>1.2.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-distributed-datastore</artifactId>
+ </dependency>
<!-- Test Dependencies -->
<dependency>
import com.google.common.base.Preconditions;
-import org.opendaylight.yangtools.yang.common.QName;
-
import java.io.Serializable;
+import org.opendaylight.yangtools.yang.common.QName;
public class ExecuteRpc implements Serializable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1128904894827335676L;
- private final String inputCompositeNode;
- private final QName rpc;
+ private final String inputCompositeNode;
+ private final QName rpc;
- public ExecuteRpc(final String inputCompositeNode, final QName rpc) {
- Preconditions.checkNotNull(inputCompositeNode, "Composite Node input string should be present");
- Preconditions.checkNotNull(rpc, "rpc Qname should not be null");
+ public ExecuteRpc(final String inputCompositeNode, final QName rpc) {
+ Preconditions.checkNotNull(inputCompositeNode, "Composite Node input string should be present");
+ Preconditions.checkNotNull(rpc, "rpc Qname should not be null");
- this.inputCompositeNode = inputCompositeNode;
- this.rpc = rpc;
- }
+ this.inputCompositeNode = inputCompositeNode;
+ this.rpc = rpc;
+ }
- public String getInputCompositeNode() {
- return inputCompositeNode;
- }
+ public String getInputCompositeNode() {
+ return inputCompositeNode;
+ }
- public QName getRpc() {
- return rpc;
- }
+ public QName getRpc() {
+ return rpc;
+ }
}
package org.opendaylight.controller.remote.rpc.messages;
import com.google.common.base.Preconditions;
+import java.io.Serializable;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import java.io.Serializable;
-
public class InvokeRpc implements Serializable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = -2813459607858108953L;
- private final QName rpc;
- private final YangInstanceIdentifier identifier;
- private final CompositeNode input;
+ private final QName rpc;
+ private final YangInstanceIdentifier identifier;
+ private final CompositeNode input;
- public InvokeRpc(final QName rpc, final YangInstanceIdentifier identifier, final CompositeNode input) {
- Preconditions.checkNotNull(rpc, "rpc qname should not be null");
- Preconditions.checkNotNull(input, "rpc input should not be null");
+ public InvokeRpc(final QName rpc, final YangInstanceIdentifier identifier, final CompositeNode input) {
+ Preconditions.checkNotNull(rpc, "rpc qname should not be null");
+ Preconditions.checkNotNull(input, "rpc input should not be null");
- this.rpc = rpc;
- this.identifier = identifier;
- this.input = input;
- }
+ this.rpc = rpc;
+ this.identifier = identifier;
+ this.input = input;
+ }
- public QName getRpc() {
- return rpc;
- }
+ public QName getRpc() {
+ return rpc;
+ }
- public YangInstanceIdentifier getIdentifier() {
- return identifier;
- }
+ public YangInstanceIdentifier getIdentifier() {
+ return identifier;
+ }
- public CompositeNode getInput() {
- return input;
- }
+ public CompositeNode getInput() {
+ return input;
+ }
}
import java.io.Serializable;
public class RpcResponse implements Serializable {
- private static final long serialVersionUID = 1L;
- private final String resultCompositeNode;
+ private static final long serialVersionUID = -4211279498688989245L;
- public RpcResponse(final String resultCompositeNode) {
- this.resultCompositeNode = resultCompositeNode;
- }
+ private final String resultCompositeNode;
- public String getResultCompositeNode() {
- return resultCompositeNode;
- }
+ public RpcResponse(final String resultCompositeNode) {
+ this.resultCompositeNode = resultCompositeNode;
+ }
+
+ public String getResultCompositeNode() {
+ return resultCompositeNode;
+ }
}
import org.opendaylight.controller.sal.connector.api.RpcRouter;
public class RoutingTable implements Copier<RoutingTable>, Serializable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 5592610415175278760L;
private final Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> table = new HashMap<>();
private ActorRef router;
import java.io.Serializable;
public class BucketImpl<T extends Copier<T>> implements Bucket<T>, Serializable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 294779770032719196L;
private Long version = System.currentTimeMillis();
}
public static class ContainsBuckets implements Serializable{
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = -4940160367495308286L;
+
private final Map<Address, Bucket> buckets;
public ContainsBuckets(Map<Address, Bucket> buckets){
}
public static class ContainsBucketVersions implements Serializable{
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = -8172148925383801613L;
+
Map<Address, Long> versions;
public ContainsBucketVersions(Map<Address, Long> versions) {
public static class GossiperMessages{
public static class Tick implements Serializable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = -4770935099506366773L;
}
public static final class GossipTick extends Tick {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 5803354404380026143L;
}
public static final class GossipStatus extends ContainsBucketVersions implements Serializable{
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = -593037395143883265L;
+
private final Address from;
public GossipStatus(Address from, Map<Address, Long> versions) {
}
public static final class GossipEnvelope extends ContainsBuckets implements Serializable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 8346634072582438818L;
+
private final Address from;
private final Address to;
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ ~
+ ~ This program and the accompanying materials are made available under the
+ ~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ ~ and is available at http://www.eclipse.org/legal/epl-v10.html
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>netconf-subsystem</artifactId>
+ <version>0.3.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>ietf-netconf-notifications</artifactId>
+ <packaging>bundle</packaging>
+ <name>${project.artifactId}</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>ietf-netconf</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools.model</groupId>
+ <artifactId>ietf-yang-types</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <configuration>
+ <instructions>
+ <Export-Package>
+ org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.*,
+ org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.*,
+ org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.*
+ </Export-Package>
+ </instructions>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
--- /dev/null
+module ietf-netconf-notifications {
+
+ namespace
+ "urn:ietf:params:xml:ns:yang:ietf-netconf-notifications";
+
+ prefix ncn;
+
+ import ietf-inet-types { prefix inet; }
+ import ietf-netconf { prefix nc; }
+
+ organization
+ "IETF NETCONF (Network Configuration Protocol) Working Group";
+
+ contact
+ "WG Web: <http://tools.ietf.org/wg/netconf/>
+ WG List: <mailto:netconf@ietf.org>
+
+ WG Chair: Bert Wijnen
+ <mailto:bertietf@bwijnen.net>
+
+ WG Chair: Mehmet Ersue
+ <mailto:mehmet.ersue@nsn.com>
+
+ Editor: Andy Bierman
+ <mailto:andy@netconfcentral.org>";
+
+ description
+ "This module defines a YANG data model for use with the
+ NETCONF protocol that allows the NETCONF client to
+ receive common NETCONF base event notifications.
+
+ Copyright (c) 2012 IETF Trust and the persons identified as
+ the document authors. All rights reserved.
+
+ Redistribution and use in source and binary forms, with or
+ without modification, is permitted pursuant to, and subject
+ to the license terms contained in, the Simplified BSD License
+
+
+
+ set forth in Section 4.c of the IETF Trust's Legal Provisions
+ Relating to IETF Documents
+ (http://trustee.ietf.org/license-info).
+
+ This version of this YANG module is part of RFC 6470; see
+ the RFC itself for full legal notices.";
+
+ revision "2012-02-06" {
+ description
+ "Initial version. Errata 3957 added.";
+ reference
+ "RFC 6470: NETCONF Base Notifications";
+ }
+
+ grouping common-session-parms {
+ description
+ "Common session parameters to identify a
+ management session.";
+
+ leaf username {
+ type string;
+ mandatory true;
+ description
+ "Name of the user for the session.";
+ }
+
+ leaf session-id {
+ type nc:session-id-or-zero-type;
+ mandatory true;
+ description
+ "Identifier of the session.
+ A NETCONF session MUST be identified by a non-zero value.
+ A non-NETCONF session MAY be identified by the value zero.";
+ }
+
+ leaf source-host {
+ type inet:ip-address;
+ description
+ "Address of the remote host for the session.";
+ }
+ }
+
+
+
+
+
+
+
+
+ grouping changed-by-parms {
+ description
+ "Common parameters to identify the source
+ of a change event, such as a configuration
+ or capability change.";
+
+ container changed-by {
+ description
+ "Indicates the source of the change.
+ If caused by internal action, then the
+ empty leaf 'server' will be present.
+ If caused by a management session, then
+ the name, remote host address, and session ID
+ of the session that made the change will be reported.";
+ choice server-or-user {
+ mandatory true;
+ leaf server {
+ type empty;
+ description
+ "If present, the change was caused
+ by the server.";
+ }
+
+ case by-user {
+ uses common-session-parms;
+ }
+ } // choice server-or-user
+ } // container changed-by-parms
+ }
+
+
+ notification netconf-config-change {
+ description
+ "Generated when the NETCONF server detects that the
+ <running> or <startup> configuration datastore
+ has been changed by a management session.
+ The notification summarizes the edits that
+ have been detected.
+
+ The server MAY choose to also generate this
+ notification while loading a datastore during the
+ boot process for the device.";
+
+ uses changed-by-parms;
+
+
+
+
+
+ leaf datastore {
+ type enumeration {
+ enum running {
+ description "The <running> datastore has changed.";
+ }
+ enum startup {
+ description "The <startup> datastore has changed";
+ }
+ }
+ default "running";
+ description
+ "Indicates which configuration datastore has changed.";
+ }
+
+ list edit {
+ description
+ "An edit record SHOULD be present for each distinct
+ edit operation that the server has detected on
+ the target datastore. This list MAY be omitted
+ if the detailed edit operations are not known.
+ The server MAY report entries in this list for
+ changes not made by a NETCONF session (e.g., CLI).";
+
+ leaf target {
+ type instance-identifier;
+ description
+ "Topmost node associated with the configuration change.
+ A server SHOULD set this object to the node within
+ the datastore that is being altered. A server MAY
+ set this object to one of the ancestors of the actual
+ node that was changed, or omit this object, if the
+ exact node is not known.";
+ }
+
+ leaf operation {
+ type nc:edit-operation-type;
+ description
+ "Type of edit operation performed.
+ A server MUST set this object to the NETCONF edit
+ operation performed on the target datastore.";
+ }
+ } // list edit
+ } // notification netconf-config-change
+
+
+
+
+
+
+ notification netconf-capability-change {
+ description
+ "Generated when the NETCONF server detects that
+ the server capabilities have changed.
+ Indicates which capabilities have been added, deleted,
+ and/or modified. The manner in which a server
+ capability is changed is outside the scope of this
+ document.";
+
+ uses changed-by-parms;
+
+ leaf-list added-capability {
+ type inet:uri;
+ description
+ "List of capabilities that have just been added.";
+ }
+
+ leaf-list deleted-capability {
+ type inet:uri;
+ description
+ "List of capabilities that have just been deleted.";
+ }
+
+ leaf-list modified-capability {
+ type inet:uri;
+ description
+ "List of capabilities that have just been modified.
+ A capability is considered to be modified if the
+ base URI for the capability has not changed, but
+ one or more of the parameters encoded at the end of
+ the capability URI have changed.
+ The new modified value of the complete URI is returned.";
+ }
+ } // notification netconf-capability-change
+
+
+ notification netconf-session-start {
+ description
+ "Generated when a NETCONF server detects that a
+ NETCONF session has started. A server MAY generate
+ this event for non-NETCONF management sessions.
+ Indicates the identity of the user that started
+ the session.";
+ uses common-session-parms;
+ } // notification netconf-session-start
+
+
+
+
+ notification netconf-session-end {
+ description
+ "Generated when a NETCONF server detects that a
+ NETCONF session has terminated.
+ A server MAY optionally generate this event for
+ non-NETCONF management sessions. Indicates the
+ identity of the user that owned the session,
+ and why the session was terminated.";
+
+ uses common-session-parms;
+
+ leaf killed-by {
+ when "../termination-reason = 'killed'";
+ type nc:session-id-type;
+ description
+ "The ID of the session that directly caused this session
+ to be abnormally terminated. If this session was abnormally
+ terminated by a non-NETCONF session unknown to the server,
+ then this leaf will not be present.";
+ }
+
+ leaf termination-reason {
+ type enumeration {
+ enum "closed" {
+ description
+ "The session was terminated by the client in normal
+ fashion, e.g., by the NETCONF <close-session>
+ protocol operation.";
+ }
+ enum "killed" {
+ description
+ "The session was terminated in abnormal
+ fashion, e.g., by the NETCONF <kill-session>
+ protocol operation.";
+ }
+ enum "dropped" {
+ description
+ "The session was terminated because the transport layer
+ connection was unexpectedly closed.";
+ }
+ enum "timeout" {
+ description
+ "The session was terminated because of inactivity,
+ e.g., waiting for the <hello> message or <rpc>
+ messages.";
+ }
+
+
+
+ enum "bad-hello" {
+ description
+ "The client's <hello> message was invalid.";
+ }
+ enum "other" {
+ description
+ "The session was terminated for some other reason.";
+ }
+ }
+ mandatory true;
+ description
+ "Reason the session was terminated.";
+ }
+ } // notification netconf-session-end
+
+
+ notification netconf-confirmed-commit {
+ description
+ "Generated when a NETCONF server detects that a
+ confirmed-commit event has occurred. Indicates the event
+ and the current state of the confirmed-commit procedure
+ in progress.";
+ reference
+ "RFC 6241, Section 8.4";
+
+ uses common-session-parms {
+ when "confirm-event != 'timeout'";
+ }
+
+ leaf confirm-event {
+ type enumeration {
+ enum "start" {
+ description
+ "The confirmed-commit procedure has started.";
+ }
+ enum "cancel" {
+ description
+ "The confirmed-commit procedure has been canceled,
+ e.g., due to the session being terminated, or an
+ explicit <cancel-commit> operation.";
+ }
+ enum "timeout" {
+ description
+ "The confirmed-commit procedure has been canceled
+ due to the confirm-timeout interval expiring.
+ The common session parameters will not be present
+ in this sub-mode.";
+ }
+
+ enum "extend" {
+ description
+ "The confirmed-commit timeout has been extended,
+ e.g., by a new <confirmed-commit> operation.";
+ }
+ enum "complete" {
+ description
+ "The confirmed-commit procedure has been completed.";
+ }
+ }
+ mandatory true;
+ description
+ "Indicates the event that caused the notification.";
+ }
+
+ leaf timeout {
+ when
+ "../confirm-event = 'start' or ../confirm-event = 'extend'";
+ type uint32;
+ units "seconds";
+ description
+ "The configured timeout value if the event type
+ is 'start' or 'extend'. This value represents
+ the approximate number of seconds from the event
+ time when the 'timeout' event might occur.";
+ }
+ } // notification netconf-confirmed-commit
+
+}
--- /dev/null
+module nc-notifications {
+
+ namespace "urn:ietf:params:xml:ns:netmod:notification";
+ prefix "manageEvent";
+
+ import ietf-yang-types{ prefix yang; }
+ import notifications { prefix ncEvent; }
+
+ organization
+ "IETF NETCONF WG";
+
+ contact
+ "netconf@ietf.org";
+
+ description
+ "Conversion of the 'manageEvent' XSD in the NETCONF
+ Notifications RFC.";
+
+ reference
+ "RFC 5277";
+
+ revision 2008-07-14 {
+ description "RFC 5277 version.";
+ }
+
+ container netconf {
+ description "Top-level element in the notification namespace";
+
+ config false;
+
+ container streams {
+ description
+ "The list of event streams supported by the system. When
+ a query is issued, the returned set of streams is
+ determined based on user privileges.";
+
+ list stream {
+ description
+ "Stream name, description and other information.";
+ key name;
+ min-elements 1;
+
+ leaf name {
+ description
+ "The name of the event stream. If this is the default
+ NETCONF stream, this must have the value 'NETCONF'.";
+ type ncEvent:streamNameType;
+ }
+
+ leaf description {
+ description
+ "A description of the event stream, including such
+ information as the type of events that are sent over
+ this stream.";
+ type string;
+ mandatory true;
+ }
+
+ leaf replaySupport {
+ description
+ "A description of the event stream, including such
+ information as the type of events that are sent over
+ this stream.";
+ type boolean;
+ mandatory true;
+ }
+
+ leaf replayLogCreationTime {
+ description
+ "The timestamp of the creation of the log used to support
+ the replay function on this stream. Note that this might
+ be earlier then the earliest available notification in
+ the log. This object is updated if the log resets for
+ some reason. This object MUST be present if replay is
+ supported.";
+ type yang:date-and-time; // xsd:dateTime is wrong!
+ }
+ }
+ }
+ }
+
+ notification replayComplete {
+ description
+ "This notification is sent to signal the end of a replay
+ portion of a subscription.";
+ }
+
+ notification notificationComplete {
+ description
+ "This notification is sent to signal the end of a notification
+ subscription. It is sent in the case that stopTime was
+ specified during the creation of the subscription..";
+ }
+
+}
--- /dev/null
+module notifications {
+
+ namespace "urn:ietf:params:xml:ns:netconf:notification:1.0";
+ prefix "ncEvent";
+
+ import ietf-yang-types { prefix yang; }
+
+ organization
+ "IETF NETCONF WG";
+
+ contact
+ "netconf@ops.ietf.org";
+
+ description
+ "Conversion of the 'ncEvent' XSD in the
+ NETCONF Notifications RFC.";
+
+ reference
+ "RFC 5277.";
+
+ revision 2008-07-14 {
+ description "RFC 5277 version.";
+ }
+
+ typedef streamNameType {
+ description
+ "The name of an event stream.";
+ type string;
+ }
+
+ rpc create-subscription {
+ description
+ "The command to create a notification subscription. It
+ takes as argument the name of the notification stream
+ and filter. Both of those options limit the content of
+ the subscription. In addition, there are two time-related
+ parameters, startTime and stopTime, which can be used to
+ select the time interval of interest to the notification
+ replay feature.";
+
+ input {
+ leaf stream {
+ description
+ "An optional parameter that indicates which stream of events
+ is of interest. If not present, then events in the default
+ NETCONF stream will be sent.";
+ type streamNameType;
+ default "NETCONF";
+ }
+
+ anyxml filter {
+ description
+ "An optional parameter that indicates which subset of all
+ possible events is of interest. The format of this
+ parameter is the same as that of the filter parameter
+ in the NETCONF protocol operations. If not present,
+ all events not precluded by other parameters will
+ be sent.";
+ }
+
+ leaf startTime {
+ description
+ "A parameter used to trigger the replay feature and
+ indicates that the replay should start at the time
+ specified. If start time is not present, this is not a
+ replay subscription.";
+ type yang:date-and-time;
+ }
+
+ leaf stopTime {
+ // must ". >= ../startTime";
+ description
+ "An optional parameter used with the optional replay
+ feature to indicate the newest notifications of
+ interest. If stop time is not present, the notifications
+ will continue until the subscription is terminated.
+ Must be used with startTime.";
+ type yang:date-and-time;
+ }
+ }
+ }
+}
+
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>ietf-netconf</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>ietf-netconf-monitoring</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>ietf-netconf-notifications</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>netconf-notifications-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>netconf-notifications-impl</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netconf-client</artifactId>
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.mapping.api;
+
+import org.opendaylight.controller.netconf.api.NetconfSession;
+
+public interface SessionAwareNetconfOperation extends NetconfOperation {
+
+ void setSession(NetconfSession session);
+}
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ ~
+ ~ This program and the accompanying materials are made available under the
+ ~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ ~ and is available at http://www.eclipse.org/legal/epl-v10.html
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>netconf-subsystem</artifactId>
+ <groupId>org.opendaylight.controller</groupId>
+ <version>0.3.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <packaging>bundle</packaging>
+ <artifactId>netconf-notifications-api</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>netconf-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>ietf-netconf-notifications</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <configuration>
+ <instructions>
+ <Export-Package>org.opendaylight.controller.netconf.notifications.*</Export-Package>
+ </instructions>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications;
+
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
+
+
+/**
+ * Listener for base netconf notifications defined in https://tools.ietf.org/html/rfc6470.
+ * This listener uses generated classes from yang model defined in RFC6470.
+ * It alleviates the provisioning of base netconf notifications from the code.
+ */
+public interface BaseNetconfNotificationListener {
+
+ /**
+ * Callback used to notify about a change in used capabilities
+ */
+ void onCapabilityChanged(NetconfCapabilityChange capabilityChange);
+
+ // TODO add other base notifications
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications;
+
+/**
+ * Registration for base notification publisher. This registration allows for publishing of base netconf notifications using generated classes
+ */
+public interface BaseNotificationPublisherRegistration extends NotificationRegistration, BaseNetconfNotificationListener {
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications;
+
+import com.google.common.base.Preconditions;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+/**
+ * Special kind of netconf message that contains a timestamp.
+ */
+public final class NetconfNotification extends NetconfMessage {
+
+ public static final String NOTIFICATION = "notification";
+ public static final String NOTIFICATION_NAMESPACE = "urn:ietf:params:netconf:capability:notification:1.0";
+ public static final String RFC3339_DATE_FORMAT_BLUEPRINT = "yyyy-MM-dd'T'HH:mm:ssXXX";
+ public static final String EVENT_TIME = "eventTime";
+
+ /**
+ * Create new notification and capture the timestamp in the constructor
+ */
+ public NetconfNotification(final Document notificationContent) {
+ this(notificationContent, new Date());
+ }
+
+ /**
+ * Create new notification with provided timestamp
+ */
+ public NetconfNotification(final Document notificationContent, final Date eventTime) {
+ super(wrapNotification(notificationContent, eventTime));
+ }
+
+ private static Document wrapNotification(final Document notificationContent, final Date eventTime) {
+ Preconditions.checkNotNull(notificationContent);
+ Preconditions.checkNotNull(eventTime);
+
+ final Element baseNotification = notificationContent.getDocumentElement();
+ final Element entireNotification = notificationContent.createElementNS(NOTIFICATION_NAMESPACE, NOTIFICATION);
+ entireNotification.appendChild(baseNotification);
+
+ final Element eventTimeElement = notificationContent.createElementNS(NOTIFICATION_NAMESPACE, EVENT_TIME);
+ eventTimeElement.setTextContent(getSerializedEventTime(eventTime));
+ entireNotification.appendChild(eventTimeElement);
+
+ notificationContent.appendChild(entireNotification);
+ return notificationContent;
+ }
+
+ private static String getSerializedEventTime(final Date eventTime) {
+ // SimpleDateFormat is not threadsafe, cannot be in a constant
+ return new SimpleDateFormat(RFC3339_DATE_FORMAT_BLUEPRINT).format(eventTime);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications;
+
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
+
+/**
+ * Collector of all notifications. Base or generic
+ */
+public interface NetconfNotificationCollector {
+
+ /**
+ * Add notification publisher for a particular stream
+ *
+ * Implementations should allow for multiple publishers of a single stream
+ * and its up to implementations to decide how to merge metadata (e.g. description)
+ * for the same stream when providing information about available stream
+ *
+ */
+ NotificationPublisherRegistration registerNotificationPublisher(Stream stream);
+
+ /**
+ * Register base notification publisher
+ */
+ BaseNotificationPublisherRegistration registerBaseNotificationPublisher();
+
+ /**
+ * Users of the registry have an option to get notification each time new notification stream gets registered
+ * This allows for a push model in addition to pull model for retrieving information about available streams.
+ *
+ * The listener should receive callbacks for each stream available prior to the registration when its registered
+ */
+ NotificationRegistration registerStreamListener(NetconfNotificationStreamListener listener);
+
+ /**
+ * Simple listener that receives notifications about changes in stream availability
+ */
+ public interface NetconfNotificationStreamListener {
+
+ /**
+ * Stream becomes available in the collector (first publisher is registered)
+ */
+ void onStreamRegistered(Stream stream);
+
+ /**
+ * Stream is not available anymore in the collector (last publisher is unregistered)
+ */
+ void onStreamUnregistered(StreamNameType stream);
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications;
+
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
+
+/**
+ * Generic listener for netconf notifications
+ */
+public interface NetconfNotificationListener {
+
+ /**
+ * Callback used to notify the listener about any new notification
+ */
+ void onNotification(StreamNameType stream, NetconfNotification notification);
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications;
+
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+
+/**
+ *
+ */
+public interface NetconfNotificationRegistry {
+
+ /**
+ * Add listener for a certain notification type
+ */
+ NotificationListenerRegistration registerNotificationListener(StreamNameType stream, NetconfNotificationListener listener);
+
+ /**
+ * Check stream availability
+ */
+ boolean isStreamAvailable(StreamNameType streamNameType);
+
+ /**
+ * Get all the streams available
+ */
+ Streams getNotificationPublishers();
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications;
+
+/**
+ * Manages the registration of a single listener
+ */
+public interface NotificationListenerRegistration extends NotificationRegistration {
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications;
+
+/**
+ * Registration for notification publisher. This registration allows for publishing any netconf notifications
+ */
+public interface NotificationPublisherRegistration extends NetconfNotificationListener, NotificationRegistration {
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications;
+
+/**
+ * Generic registration, used as a base for other registration types
+ */
+public interface NotificationRegistration extends AutoCloseable {
+
+ // Overriden close does not throw any kind of checked exception
+
+ /**
+ * Close the registration.
+ */
+ @Override
+ void close();
+}
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ ~
+ ~ This program and the accompanying materials are made available under the
+ ~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ ~ and is available at http://www.eclipse.org/legal/epl-v10.html
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>netconf-subsystem</artifactId>
+ <groupId>org.opendaylight.controller</groupId>
+ <version>0.3.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <packaging>bundle</packaging>
+ <artifactId>netconf-notifications-impl</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>netconf-notifications-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>netconf-util</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>binding-generator-impl</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>binding-data-codec</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>xmlunit</groupId>
+ <artifactId>xmlunit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>mockito-configuration</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <configuration>
+ <instructions>
+ <Bundle-Activator>org.opendaylight.controller.netconf.notifications.impl.osgi.Activator</Bundle-Activator>
+ </instructions>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+import com.google.common.collect.Sets;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import org.opendaylight.controller.netconf.notifications.BaseNotificationPublisherRegistration;
+import org.opendaylight.controller.netconf.notifications.NetconfNotification;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationCollector;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationListener;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationRegistry;
+import org.opendaylight.controller.netconf.notifications.NotificationListenerRegistration;
+import org.opendaylight.controller.netconf.notifications.NotificationPublisherRegistration;
+import org.opendaylight.controller.netconf.notifications.NotificationRegistration;
+import org.opendaylight.controller.netconf.notifications.impl.ops.NotificationsTransformUtil;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.StreamsBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamKey;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ThreadSafe
+public class NetconfNotificationManager implements NetconfNotificationCollector, NetconfNotificationRegistry, NetconfNotificationListener, AutoCloseable {
+
+ public static final StreamNameType BASE_STREAM_NAME = new StreamNameType("NETCONF");
+ public static final Stream BASE_NETCONF_STREAM;
+
+ static {
+ BASE_NETCONF_STREAM = new StreamBuilder()
+ .setName(BASE_STREAM_NAME)
+ .setKey(new StreamKey(BASE_STREAM_NAME))
+ .setReplaySupport(false)
+ .setDescription("Default Event Stream")
+ .build();
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfNotificationManager.class);
+
+ // TODO excessive synchronization provides thread safety but is most likely not optimal (combination of concurrent collections might improve performance)
+ // And also calling callbacks from a synchronized block is dangerous since the listeners/publishers can block the whole notification processing
+
+ @GuardedBy("this")
+ private final Multimap<StreamNameType, GenericNotificationListenerReg> notificationListeners = HashMultimap.create();
+
+ @GuardedBy("this")
+ private final Set<NetconfNotificationStreamListener> streamListeners = Sets.newHashSet();
+
+ @GuardedBy("this")
+ private final Map<StreamNameType, Stream> streamMetadata = Maps.newHashMap();
+
+ @GuardedBy("this")
+ private final Multiset<StreamNameType> availableStreams = HashMultiset.create();
+
+ @GuardedBy("this")
+ private final Set<GenericNotificationPublisherReg> notificationPublishers = Sets.newHashSet();
+
+ @Override
+ public synchronized void onNotification(final StreamNameType stream, final NetconfNotification notification) {
+ LOG.debug("Notification of type {} detected", stream);
+ if(LOG.isTraceEnabled()) {
+ LOG.debug("Notification of type {} detected: {}", stream, notification);
+ }
+
+ for (final GenericNotificationListenerReg listenerReg : notificationListeners.get(BASE_STREAM_NAME)) {
+ listenerReg.getListener().onNotification(BASE_STREAM_NAME, notification);
+ }
+ }
+
+ @Override
+ public synchronized NotificationListenerRegistration registerNotificationListener(final StreamNameType stream, final NetconfNotificationListener listener) {
+ Preconditions.checkNotNull(stream);
+ Preconditions.checkNotNull(listener);
+
+ LOG.trace("Notification listener registered for stream: {}", stream);
+
+ final GenericNotificationListenerReg genericNotificationListenerReg = new GenericNotificationListenerReg(listener) {
+ @Override
+ public void close() {
+ synchronized (NetconfNotificationManager.this) {
+ LOG.trace("Notification listener unregistered for stream: {}", stream);
+ super.close();
+ }
+ }
+ };
+
+ notificationListeners.put(BASE_STREAM_NAME, genericNotificationListenerReg);
+ return genericNotificationListenerReg;
+ }
+
+ @Override
+ public synchronized Streams getNotificationPublishers() {
+ return new StreamsBuilder().setStream(Lists.newArrayList(streamMetadata.values())).build();
+ }
+
+ @Override
+ public synchronized boolean isStreamAvailable(final StreamNameType streamNameType) {
+ return availableStreams.contains(streamNameType);
+ }
+
+ @Override
+ public synchronized NotificationRegistration registerStreamListener(final NetconfNotificationStreamListener listener) {
+ streamListeners.add(listener);
+
+ // Notify about all already available
+ for (final Stream availableStream : streamMetadata.values()) {
+ listener.onStreamRegistered(availableStream);
+ }
+
+ return new NotificationRegistration() {
+ @Override
+ public void close() {
+ synchronized(NetconfNotificationManager.this) {
+ streamListeners.remove(listener);
+ }
+ }
+ };
+ }
+
+ @Override
+ public synchronized void close() {
+ // Unregister all listeners
+ for (final GenericNotificationListenerReg genericNotificationListenerReg : notificationListeners.values()) {
+ genericNotificationListenerReg.close();
+ }
+ notificationListeners.clear();
+
+ // Unregister all publishers
+ for (final GenericNotificationPublisherReg notificationPublisher : notificationPublishers) {
+ notificationPublisher.close();
+ }
+ notificationPublishers.clear();
+
+ // Clear stream Listeners
+ streamListeners.clear();
+ }
+
+ @Override
+ public synchronized NotificationPublisherRegistration registerNotificationPublisher(final Stream stream) {
+ Preconditions.checkNotNull(stream);
+ final StreamNameType streamName = stream.getName();
+
+ LOG.debug("Notification publisher registered for stream: {}", streamName);
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("Notification publisher registered for stream: {}", stream);
+ }
+
+ if(streamMetadata.containsKey(streamName)) {
+ LOG.warn("Notification stream {} already registered as: {}. Will be reused", streamName, streamMetadata.get(streamName));
+ } else {
+ streamMetadata.put(streamName, stream);
+ }
+
+ availableStreams.add(streamName);
+
+ final GenericNotificationPublisherReg genericNotificationPublisherReg = new GenericNotificationPublisherReg(this, streamName) {
+ @Override
+ public void close() {
+ synchronized (NetconfNotificationManager.this) {
+ super.close();
+ }
+ }
+ };
+
+ notificationPublishers.add(genericNotificationPublisherReg);
+
+ notifyStreamAdded(stream);
+ return genericNotificationPublisherReg;
+ }
+
+ private void unregisterNotificationPublisher(final StreamNameType streamName, final GenericNotificationPublisherReg genericNotificationPublisherReg) {
+ availableStreams.remove(streamName);
+ notificationPublishers.remove(genericNotificationPublisherReg);
+
+ LOG.debug("Notification publisher unregistered for stream: {}", streamName);
+
+ // Notify stream listeners if all publishers are gone and also clear metadata for stream
+ if (!isStreamAvailable(streamName)) {
+ LOG.debug("Notification stream: {} became unavailable", streamName);
+ streamMetadata.remove(streamName);
+ notifyStreamRemoved(streamName);
+ }
+ }
+
+ private synchronized void notifyStreamAdded(final Stream stream) {
+ for (final NetconfNotificationStreamListener streamListener : streamListeners) {
+ streamListener.onStreamRegistered(stream);
+ }
+ }
+ private synchronized void notifyStreamRemoved(final StreamNameType stream) {
+ for (final NetconfNotificationStreamListener streamListener : streamListeners) {
+ streamListener.onStreamUnregistered(stream);
+ }
+ }
+
+ @Override
+ public BaseNotificationPublisherRegistration registerBaseNotificationPublisher() {
+ final NotificationPublisherRegistration notificationPublisherRegistration = registerNotificationPublisher(BASE_NETCONF_STREAM);
+ return new BaseNotificationPublisherReg(notificationPublisherRegistration);
+ }
+
+ private static class GenericNotificationPublisherReg implements NotificationPublisherRegistration {
+ private NetconfNotificationManager baseListener;
+ private final StreamNameType registeredStream;
+
+ public GenericNotificationPublisherReg(final NetconfNotificationManager baseListener, final StreamNameType registeredStream) {
+ this.baseListener = baseListener;
+ this.registeredStream = registeredStream;
+ }
+
+ @Override
+ public void close() {
+ baseListener.unregisterNotificationPublisher(registeredStream, this);
+ baseListener = null;
+ }
+
+ @Override
+ public void onNotification(final StreamNameType stream, final NetconfNotification notification) {
+ Preconditions.checkState(baseListener != null, "Already closed");
+ Preconditions.checkArgument(stream.equals(registeredStream));
+ baseListener.onNotification(stream, notification);
+ }
+ }
+
+ private static class BaseNotificationPublisherReg implements BaseNotificationPublisherRegistration {
+
+ private final NotificationPublisherRegistration baseRegistration;
+
+ public BaseNotificationPublisherReg(final NotificationPublisherRegistration baseRegistration) {
+ this.baseRegistration = baseRegistration;
+ }
+
+ @Override
+ public void close() {
+ baseRegistration.close();
+ }
+
+ @Override
+ public void onCapabilityChanged(final NetconfCapabilityChange capabilityChange) {
+ baseRegistration.onNotification(BASE_STREAM_NAME, serializeNotification(capabilityChange));
+ }
+
+ private static NetconfNotification serializeNotification(final NetconfCapabilityChange capabilityChange) {
+ return NotificationsTransformUtil.transform(capabilityChange);
+ }
+ }
+
+ private class GenericNotificationListenerReg implements NotificationListenerRegistration {
+ private final NetconfNotificationListener listener;
+
+ public GenericNotificationListenerReg(final NetconfNotificationListener listener) {
+ this.listener = listener;
+ }
+
+ public NetconfNotificationListener getListener() {
+ return listener;
+ }
+
+ @Override
+ public void close() {
+ notificationListeners.remove(BASE_STREAM_NAME, this);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications.impl.ops;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
+import org.opendaylight.controller.netconf.api.NetconfSession;
+import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
+import org.opendaylight.controller.netconf.mapping.api.SessionAwareNetconfOperation;
+import org.opendaylight.controller.netconf.notifications.NetconfNotification;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationListener;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationRegistry;
+import org.opendaylight.controller.netconf.notifications.NotificationListenerRegistration;
+import org.opendaylight.controller.netconf.notifications.impl.NetconfNotificationManager;
+import org.opendaylight.controller.netconf.util.mapping.AbstractLastNetconfOperation;
+import org.opendaylight.controller.netconf.util.xml.XmlElement;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+/**
+ * Create subscription listens for create subscription requests and registers notification listeners into notification registry.
+ * Received notifications are sent to the client right away
+ */
+public class CreateSubscription extends AbstractLastNetconfOperation implements SessionAwareNetconfOperation, AutoCloseable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CreateSubscription.class);
+
+ static final String CREATE_SUBSCRIPTION = "create-subscription";
+
+ private final NetconfNotificationRegistry notifications;
+ private final List<NotificationListenerRegistration> subscriptions = Lists.newArrayList();
+ private NetconfSession netconfSession;
+
+ public CreateSubscription(final String netconfSessionIdForReporting, final NetconfNotificationRegistry notifications) {
+ super(netconfSessionIdForReporting);
+ this.notifications = notifications;
+ }
+
+ @Override
+ protected Element handleWithNoSubsequentOperations(final Document document, final XmlElement operationElement) throws NetconfDocumentedException {
+ operationElement.checkName(CREATE_SUBSCRIPTION);
+ operationElement.checkNamespace(CreateSubscriptionInput.QNAME.getNamespace().toString());
+ // FIXME reimplement using CODEC_REGISTRY and parse everything into generated class instance
+ // Waiting ofr https://git.opendaylight.org/gerrit/#/c/13763/
+
+ // FIXME filter could be supported same way as netconf server filters get and get-config results
+ final Optional<XmlElement> filter = operationElement.getOnlyChildElementWithSameNamespaceOptionally("filter");
+ Preconditions.checkArgument(filter.isPresent() == false, "Filter element not yet supported");
+
+ // Replay not supported
+ final Optional<XmlElement> startTime = operationElement.getOnlyChildElementWithSameNamespaceOptionally("startTime");
+ Preconditions.checkArgument(startTime.isPresent() == false, "StartTime element not yet supported");
+
+ // Stop time not supported
+ final Optional<XmlElement> stopTime = operationElement.getOnlyChildElementWithSameNamespaceOptionally("stopTime");
+ Preconditions.checkArgument(stopTime.isPresent() == false, "StopTime element not yet supported");
+
+ final StreamNameType streamNameType = parseStreamIfPresent(operationElement);
+
+ Preconditions.checkNotNull(netconfSession);
+ // Premature streams are allowed (meaning listener can register even if no provider is available yet)
+ if(notifications.isStreamAvailable(streamNameType) == false) {
+ LOG.warn("Registering premature stream {}. No publisher available yet for session {}", streamNameType, getNetconfSessionIdForReporting());
+ }
+
+ final NotificationListenerRegistration notificationListenerRegistration =
+ notifications.registerNotificationListener(streamNameType, new NotificationSubscription(netconfSession));
+ subscriptions.add(notificationListenerRegistration);
+
+ return XmlUtil.createElement(document, XmlNetconfConstants.OK, Optional.<String>absent());
+ }
+
+ private StreamNameType parseStreamIfPresent(final XmlElement operationElement) throws NetconfDocumentedException {
+ final Optional<XmlElement> stream = operationElement.getOnlyChildElementWithSameNamespaceOptionally("stream");
+ return stream.isPresent() ? new StreamNameType(stream.get().getTextContent()) : NetconfNotificationManager.BASE_STREAM_NAME;
+ }
+
+ @Override
+ protected String getOperationName() {
+ return CREATE_SUBSCRIPTION;
+ }
+
+ @Override
+ protected String getOperationNamespace() {
+ return CreateSubscriptionInput.QNAME.getNamespace().toString();
+ }
+
+ @Override
+ public void setSession(final NetconfSession session) {
+ this.netconfSession = session;
+ }
+
+ @Override
+ public void close() {
+ netconfSession = null;
+ // Unregister from notification streams
+ for (final NotificationListenerRegistration subscription : subscriptions) {
+ subscription.close();
+ }
+ }
+
+ private static class NotificationSubscription implements NetconfNotificationListener {
+ private final NetconfSession currentSession;
+
+ public NotificationSubscription(final NetconfSession currentSession) {
+ this.currentSession = currentSession;
+ }
+
+ @Override
+ public void onNotification(final StreamNameType stream, final NetconfNotification notification) {
+ currentSession.sendMessage(notification);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications.impl.ops;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.transform.dom.DOMResult;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
+import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
+import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationRegistry;
+import org.opendaylight.controller.netconf.util.mapping.AbstractNetconfOperation;
+import org.opendaylight.controller.netconf.util.xml.XmlElement;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.NetconfBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+/**
+ * Serialize the subtree for netconf notifications into the response of get rpc.
+ * This operation just adds its subtree into the common response of get rpc.
+ */
+public class Get extends AbstractNetconfOperation implements AutoCloseable {
+
+ private static final String GET = "get";
+ private static final InstanceIdentifier<Netconf> NETCONF_SUBTREE_INSTANCE_IDENTIFIER = InstanceIdentifier.builder(Netconf.class).build();
+
+ private final NetconfNotificationRegistry notificationRegistry;
+
+ public Get(final String netconfSessionIdForReporting, final NetconfNotificationRegistry notificationRegistry) {
+ super(netconfSessionIdForReporting);
+ Preconditions.checkNotNull(notificationRegistry);
+ this.notificationRegistry = notificationRegistry;
+ }
+
+ @Override
+ protected String getOperationName() {
+ return GET;
+ }
+
+ @Override
+ public Document handle(final Document requestMessage, final NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException {
+ final Document partialResponse = subsequentOperation.execute(requestMessage);
+ final Streams availableStreams = notificationRegistry.getNotificationPublishers();
+ if(availableStreams.getStream().isEmpty() == false) {
+ serializeStreamsSubtree(partialResponse, availableStreams);
+ }
+ return partialResponse;
+ }
+
+ static void serializeStreamsSubtree(final Document partialResponse, final Streams availableStreams) throws NetconfDocumentedException {
+ final Netconf netconfSubtree = new NetconfBuilder().setStreams(availableStreams).build();
+ final NormalizedNode<?, ?> normalized = toNormalized(netconfSubtree);
+
+ final DOMResult result = new DOMResult(getPlaceholder(partialResponse));
+
+ try {
+ NotificationsTransformUtil.writeNormalizedNode(normalized, result, SchemaPath.ROOT);
+ } catch (final XMLStreamException | IOException e) {
+ throw new IllegalStateException("Unable to serialize " + netconfSubtree, e);
+ }
+ }
+
+ private static Element getPlaceholder(final Document innerResult)
+ throws NetconfDocumentedException {
+ final XmlElement rootElement = XmlElement.fromDomElementWithExpected(
+ innerResult.getDocumentElement(), XmlNetconfConstants.RPC_REPLY_KEY, XmlNetconfConstants.RFC4741_TARGET_NAMESPACE);
+ return rootElement.getOnlyChildElement(XmlNetconfConstants.DATA_KEY).getDomElement();
+ }
+
+ private static NormalizedNode<?, ?> toNormalized(final Netconf netconfSubtree) {
+ return NotificationsTransformUtil.CODEC_REGISTRY.toNormalizedNode(NETCONF_SUBTREE_INSTANCE_IDENTIFIER, netconfSubtree).getValue();
+ }
+
+ @Override
+ protected Element handle(final Document document, final XmlElement message, final NetconfOperationChainedExecution subsequentOperation)
+ throws NetconfDocumentedException {
+ throw new UnsupportedOperationException("Never gets called");
+ }
+
+ @Override
+ protected HandlingPriority getHandlingPriority() {
+ return HandlingPriority.HANDLE_WITH_DEFAULT_PRIORITY.increasePriority(2);
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications.impl.ops;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Date;
+import javassist.ClassPool;
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamWriter;
+import javax.xml.transform.dom.DOMResult;
+import org.opendaylight.controller.netconf.notifications.NetconfNotification;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.$YangModuleInfoImpl;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
+import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
+import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
+import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
+import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
+import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
+import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+
+public final class NotificationsTransformUtil {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NotificationsTransformUtil.class);
+
+ private NotificationsTransformUtil() {}
+
+ static final SchemaContext NOTIFICATIONS_SCHEMA_CTX;
+ static final BindingNormalizedNodeCodecRegistry CODEC_REGISTRY;
+ static final XMLOutputFactory XML_FACTORY;
+ static final RpcDefinition CREATE_SUBSCRIPTION_RPC;
+
+ static final SchemaPath CAPABILITY_CHANGE_SCHEMA_PATH = SchemaPath.create(true, NetconfCapabilityChange.QNAME);
+
+ static {
+ XML_FACTORY = XMLOutputFactory.newFactory();
+ XML_FACTORY.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true);
+
+ final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create();
+ moduleInfoBackedContext.addModuleInfos(Collections.singletonList($YangModuleInfoImpl.getInstance()));
+ moduleInfoBackedContext.addModuleInfos(Collections.singletonList(org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.$YangModuleInfoImpl.getInstance()));
+ final Optional<SchemaContext> schemaContextOptional = moduleInfoBackedContext.tryToCreateSchemaContext();
+ Preconditions.checkState(schemaContextOptional.isPresent());
+ NOTIFICATIONS_SCHEMA_CTX = schemaContextOptional.get();
+
+ CREATE_SUBSCRIPTION_RPC = Preconditions.checkNotNull(findCreateSubscriptionRpc());
+
+ Preconditions.checkNotNull(CREATE_SUBSCRIPTION_RPC);
+
+ final JavassistUtils javassist = JavassistUtils.forClassPool(ClassPool.getDefault());
+ CODEC_REGISTRY = new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(javassist));
+ CODEC_REGISTRY.onBindingRuntimeContextUpdated(BindingRuntimeContext.create(moduleInfoBackedContext, NOTIFICATIONS_SCHEMA_CTX));
+ }
+
+ private static RpcDefinition findCreateSubscriptionRpc() {
+ return Iterables.getFirst(Collections2.filter(NOTIFICATIONS_SCHEMA_CTX.getOperations(), new Predicate<RpcDefinition>() {
+ @Override
+ public boolean apply(final RpcDefinition input) {
+ return input.getQName().getLocalName().equals(CreateSubscription.CREATE_SUBSCRIPTION);
+ }
+ }), null);
+ }
+
+ /**
+ * Transform base notification for capabilities into NetconfNotification
+ */
+ public static NetconfNotification transform(final NetconfCapabilityChange capabilityChange) {
+ return transform(capabilityChange, Optional.<Date>absent());
+ }
+
+ public static NetconfNotification transform(final NetconfCapabilityChange capabilityChange, final Date eventTime) {
+ return transform(capabilityChange, Optional.fromNullable(eventTime));
+ }
+
+ private static NetconfNotification transform(final NetconfCapabilityChange capabilityChange, final Optional<Date> eventTime) {
+ final ContainerNode containerNode = CODEC_REGISTRY.toNormalizedNodeNotification(capabilityChange);
+ final DOMResult result = new DOMResult(XmlUtil.newDocument());
+ try {
+ writeNormalizedNode(containerNode, result, CAPABILITY_CHANGE_SCHEMA_PATH);
+ } catch (final XMLStreamException| IOException e) {
+ throw new IllegalStateException("Unable to serialize " + capabilityChange, e);
+ }
+ final Document node = (Document) result.getNode();
+ return eventTime.isPresent() ?
+ new NetconfNotification(node, eventTime.get()):
+ new NetconfNotification(node);
+ }
+
+ static void writeNormalizedNode(final NormalizedNode<?, ?> normalized, final DOMResult result, final SchemaPath schemaPath) throws IOException, XMLStreamException {
+ NormalizedNodeWriter normalizedNodeWriter = null;
+ NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
+ XMLStreamWriter writer = null;
+ try {
+ writer = XML_FACTORY.createXMLStreamWriter(result);
+ normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, NOTIFICATIONS_SCHEMA_CTX, schemaPath);
+ normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
+
+ normalizedNodeWriter.write(normalized);
+
+ normalizedNodeWriter.flush();
+ } finally {
+ try {
+ if(normalizedNodeWriter != null) {
+ normalizedNodeWriter.close();
+ }
+ if(normalizedNodeStreamWriter != null) {
+ normalizedNodeStreamWriter.close();
+ }
+ if(writer != null) {
+ writer.close();
+ }
+ } catch (final Exception e) {
+ LOG.warn("Unable to close resource properly", e);
+ }
+ }
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications.impl.osgi;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Hashtable;
+import java.util.Set;
+import org.opendaylight.controller.netconf.mapping.api.Capability;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
+import org.opendaylight.controller.netconf.notifications.NetconfNotification;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationCollector;
+import org.opendaylight.controller.netconf.notifications.impl.NetconfNotificationManager;
+import org.opendaylight.controller.netconf.notifications.impl.ops.CreateSubscription;
+import org.opendaylight.controller.netconf.notifications.impl.ops.Get;
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+
+public class Activator implements BundleActivator {
+
+ private ServiceRegistration<NetconfNotificationCollector> netconfNotificationCollectorServiceRegistration;
+ private ServiceRegistration<NetconfOperationServiceFactory> operationaServiceRegistration;
+ private NetconfNotificationManager netconfNotificationManager;
+
+ @Override
+ public void start(final BundleContext context) throws Exception {
+ netconfNotificationManager = new NetconfNotificationManager();
+ netconfNotificationCollectorServiceRegistration = context.registerService(NetconfNotificationCollector.class, netconfNotificationManager, new Hashtable<String, Object>());
+
+ final NetconfOperationServiceFactory netconfOperationServiceFactory = new NetconfOperationServiceFactory() {
+
+ @Override
+ public NetconfOperationService createService(final String netconfSessionIdForReporting) {
+ return new NetconfOperationService() {
+
+ private final CreateSubscription createSubscription = new CreateSubscription(netconfSessionIdForReporting, netconfNotificationManager);
+
+ @Override
+ public Set<Capability> getCapabilities() {
+ return Collections.<Capability>singleton(new NotificationsCapability());
+ }
+
+ @Override
+ public Set<NetconfOperation> getNetconfOperations() {
+ return Sets.<NetconfOperation>newHashSet(
+ new Get(netconfSessionIdForReporting, netconfNotificationManager),
+ createSubscription);
+ }
+
+ @Override
+ public void close() {
+ createSubscription.close();
+ }
+ };
+ }
+ };
+
+ operationaServiceRegistration = context.registerService(NetconfOperationServiceFactory.class, netconfOperationServiceFactory, new Hashtable<String, Object>());
+
+ }
+
+ @Override
+ public void stop(final BundleContext context) throws Exception {
+ if(netconfNotificationCollectorServiceRegistration != null) {
+ netconfNotificationCollectorServiceRegistration.unregister();
+ netconfNotificationCollectorServiceRegistration = null;
+ }
+ if (netconfNotificationManager != null) {
+ netconfNotificationManager.close();
+ }
+ if (operationaServiceRegistration != null) {
+ operationaServiceRegistration.unregister();
+ operationaServiceRegistration = null;
+ }
+ }
+
+ private class NotificationsCapability implements Capability {
+ @Override
+ public String getCapabilityUri() {
+ return NetconfNotification.NOTIFICATION_NAMESPACE;
+ }
+
+ @Override
+ public Optional<String> getModuleNamespace() {
+ return Optional.absent();
+ }
+
+ @Override
+ public Optional<String> getModuleName() {
+ return Optional.absent();
+ }
+
+ @Override
+ public Optional<String> getRevision() {
+ return Optional.absent();
+ }
+
+ @Override
+ public Optional<String> getCapabilitySchema() {
+ return Optional.absent();
+ }
+
+ @Override
+ public Collection<String> getLocation() {
+ return Collections.emptyList();
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications.impl;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.netconf.notifications.BaseNotificationPublisherRegistration;
+import org.opendaylight.controller.netconf.notifications.NetconfNotification;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationCollector;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationListener;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationRegistry;
+import org.opendaylight.controller.netconf.notifications.NotificationListenerRegistration;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChangeBuilder;
+
+public class NetconfNotificationManagerTest {
+
+ @Mock
+ private NetconfNotificationRegistry notificationRegistry;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testNotificationListeners() throws Exception {
+ final NetconfNotificationManager netconfNotificationManager = new NetconfNotificationManager();
+ final BaseNotificationPublisherRegistration baseNotificationPublisherRegistration =
+ netconfNotificationManager.registerBaseNotificationPublisher();
+
+ final NetconfCapabilityChangeBuilder capabilityChangedBuilder = new NetconfCapabilityChangeBuilder();
+
+ final NetconfNotificationListener listener = mock(NetconfNotificationListener.class);
+ doNothing().when(listener).onNotification(any(StreamNameType.class), any(NetconfNotification.class));
+ final NotificationListenerRegistration notificationListenerRegistration = netconfNotificationManager.registerNotificationListener(NetconfNotificationManager.BASE_NETCONF_STREAM.getName(), listener);
+ final NetconfCapabilityChange notification = capabilityChangedBuilder.build();
+ baseNotificationPublisherRegistration.onCapabilityChanged(notification);
+
+ verify(listener).onNotification(any(StreamNameType.class), any(NetconfNotification.class));
+
+ notificationListenerRegistration.close();
+
+ baseNotificationPublisherRegistration.onCapabilityChanged(notification);
+ verifyNoMoreInteractions(listener);
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ final NetconfNotificationManager netconfNotificationManager = new NetconfNotificationManager();
+
+ final BaseNotificationPublisherRegistration baseNotificationPublisherRegistration = netconfNotificationManager.registerBaseNotificationPublisher();
+
+ final NetconfNotificationListener listener = mock(NetconfNotificationListener.class);
+ doNothing().when(listener).onNotification(any(StreamNameType.class), any(NetconfNotification.class));
+
+ netconfNotificationManager.registerNotificationListener(NetconfNotificationManager.BASE_NETCONF_STREAM.getName(), listener);
+
+ final NetconfNotificationCollector.NetconfNotificationStreamListener streamListener =
+ mock(NetconfNotificationCollector.NetconfNotificationStreamListener.class);
+ doNothing().when(streamListener).onStreamUnregistered(any(StreamNameType.class));
+ doNothing().when(streamListener).onStreamRegistered(any(Stream.class));
+ netconfNotificationManager.registerStreamListener(streamListener);
+
+ verify(streamListener).onStreamRegistered(NetconfNotificationManager.BASE_NETCONF_STREAM);
+
+ netconfNotificationManager.close();
+
+ verify(streamListener).onStreamUnregistered(NetconfNotificationManager.BASE_NETCONF_STREAM.getName());
+
+ try {
+ baseNotificationPublisherRegistration.onCapabilityChanged(new NetconfCapabilityChangeBuilder().build());
+ } catch (final IllegalStateException e) {
+ // Exception should be thrown after manager is closed
+ return;
+ }
+
+ fail("Publishing into a closed manager should fail");
+ }
+
+ @Test
+ public void testStreamListeners() throws Exception {
+ final NetconfNotificationManager netconfNotificationManager = new NetconfNotificationManager();
+
+ final NetconfNotificationCollector.NetconfNotificationStreamListener streamListener = mock(NetconfNotificationCollector.NetconfNotificationStreamListener.class);
+ doNothing().when(streamListener).onStreamRegistered(any(Stream.class));
+ doNothing().when(streamListener).onStreamUnregistered(any(StreamNameType.class));
+
+ netconfNotificationManager.registerStreamListener(streamListener);
+
+ final BaseNotificationPublisherRegistration baseNotificationPublisherRegistration =
+ netconfNotificationManager.registerBaseNotificationPublisher();
+
+ verify(streamListener).onStreamRegistered(NetconfNotificationManager.BASE_NETCONF_STREAM);
+
+
+ baseNotificationPublisherRegistration.close();
+
+ verify(streamListener).onStreamUnregistered(NetconfNotificationManager.BASE_STREAM_NAME);
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications.impl.ops;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.netconf.api.NetconfSession;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationListener;
+import org.opendaylight.controller.netconf.notifications.NetconfNotificationRegistry;
+import org.opendaylight.controller.netconf.notifications.NotificationListenerRegistration;
+import org.opendaylight.controller.netconf.util.xml.XmlElement;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
+import org.w3c.dom.Element;
+
+public class CreateSubscriptionTest {
+
+ private static final String CREATE_SUBSCRIPTION_XML = "<create-subscription\n" +
+ "xmlns=\"urn:ietf:params:xml:ns:netconf:notification:1.0\" xmlns:netconf=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+ "<stream>TESTSTREAM</stream>" +
+ "</create-subscription>";
+
+ @Mock
+ private NetconfNotificationRegistry notificationRegistry;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ doReturn(true).when(notificationRegistry).isStreamAvailable(any(StreamNameType.class));
+ doReturn(mock(NotificationListenerRegistration.class)).when(notificationRegistry).registerNotificationListener(any(StreamNameType.class), any(NetconfNotificationListener.class));
+ }
+
+ @Test
+ public void testHandleWithNoSubsequentOperations() throws Exception {
+ final CreateSubscription createSubscription = new CreateSubscription("id", notificationRegistry);
+ createSubscription.setSession(mock(NetconfSession.class));
+
+ final Element e = XmlUtil.readXmlToElement(CREATE_SUBSCRIPTION_XML);
+
+ final XmlElement operationElement = XmlElement.fromDomElement(e);
+ final Element element = createSubscription.handleWithNoSubsequentOperations(XmlUtil.newDocument(), operationElement);
+
+ Assert.assertThat(XmlUtil.toString(element), CoreMatchers.containsString("ok"));
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications.impl.ops;
+
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import org.custommonkey.xmlunit.Diff;
+import org.custommonkey.xmlunit.XMLUnit;
+import org.junit.Test;
+import org.opendaylight.controller.netconf.notifications.impl.ops.Get;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.StreamsBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamKey;
+import org.w3c.dom.Document;
+import org.xml.sax.SAXException;
+
+public class GetTest {
+
+ @Test
+ public void testSerializeStreamsSubtree() throws Exception {
+ final StreamsBuilder streamsBuilder = new StreamsBuilder();
+ final StreamBuilder streamBuilder = new StreamBuilder();
+ final StreamNameType base = new StreamNameType("base");
+ streamBuilder.setName(base);
+ streamBuilder.setKey(new StreamKey(base));
+ streamBuilder.setDescription("description");
+ streamBuilder.setReplaySupport(false);
+ streamsBuilder.setStream(Lists.newArrayList(streamBuilder.build()));
+ final Streams streams = streamsBuilder.build();
+
+ final Document response = getBlankResponse();
+ Get.serializeStreamsSubtree(response, streams);
+ final Diff diff = XMLUnit.compareXML(XmlUtil.toString(response),
+ "<rpc-reply message-id=\"101\" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+ "<data>\n" +
+ "<netconf xmlns=\"urn:ietf:params:xml:ns:netmod:notification\">\n" +
+ "<streams>\n" +
+ "<stream>\n" +
+ "<name>base</name>\n" +
+ "<description>description</description>\n" +
+ "<replaySupport>false</replaySupport>\n" +
+ "</stream>\n" +
+ "</streams>\n" +
+ "</netconf>\n" +
+ "</data>\n" +
+ "</rpc-reply>\n");
+
+ assertTrue(diff.toString(), diff.identical());
+ }
+
+ private Document getBlankResponse() throws IOException, SAXException {
+
+ return XmlUtil.readXmlToDocument("<rpc-reply message-id=\"101\"\n" +
+ "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+ "<data>\n" +
+ "</data>\n" +
+ "</rpc-reply>");
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.notifications.impl.ops;
+
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import org.custommonkey.xmlunit.Diff;
+import org.custommonkey.xmlunit.XMLUnit;
+import org.junit.Test;
+import org.opendaylight.controller.netconf.notifications.NetconfNotification;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChangeBuilder;
+
+public class NotificationsTransformUtilTest {
+
+ private static final Date DATE = new Date();
+ private static final String innerNotification = "<netconf-capability-change xmlns=\"urn:ietf:params:xml:ns:yang:ietf-netconf-notifications\">" +
+ "<deleted-capability>uri3</deleted-capability>" +
+ "<deleted-capability>uri4</deleted-capability>" +
+ "<added-capability>uri1</added-capability>" +
+ "</netconf-capability-change>";
+
+ private static final String expectedNotification = "<notification xmlns=\"urn:ietf:params:netconf:capability:notification:1.0\">" +
+ innerNotification +
+ "<eventTime>" + new SimpleDateFormat(NetconfNotification.RFC3339_DATE_FORMAT_BLUEPRINT).format(DATE) + "</eventTime>" +
+ "</notification>";
+
+ @Test
+ public void testTransform() throws Exception {
+ final NetconfCapabilityChangeBuilder netconfCapabilityChangeBuilder = new NetconfCapabilityChangeBuilder();
+
+ netconfCapabilityChangeBuilder.setAddedCapability(Lists.newArrayList(new Uri("uri1"), new Uri("uri1")));
+ netconfCapabilityChangeBuilder.setDeletedCapability(Lists.newArrayList(new Uri("uri3"), new Uri("uri4")));
+
+ final NetconfCapabilityChange capabilityChange = netconfCapabilityChangeBuilder.build();
+ final NetconfNotification transform = NotificationsTransformUtil.transform(capabilityChange, DATE);
+
+ final String serialized = XmlUtil.toString(transform.getDocument());
+
+ XMLUnit.setIgnoreWhitespace(true);
+ final Diff diff = XMLUnit.compareXML(expectedNotification, serialized);
+ assertTrue(diff.toString(), diff.similar());
+ }
+
+ @Test
+ public void testTransformFromDOM() throws Exception {
+ final NetconfNotification netconfNotification = new NetconfNotification(XmlUtil.readXmlToDocument(innerNotification), DATE);
+
+ XMLUnit.setIgnoreWhitespace(true);
+ final Diff diff = XMLUnit.compareXML(expectedNotification, netconfNotification.toString());
+ assertTrue(diff.toString(), diff.similar());
+ }
+
+}
\ No newline at end of file
<version>0.3.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
- <prerequisites>
- <maven>3.0.4</maven>
- </prerequisites>
<modules>
<module>netconf-api</module>
<module>netconf-ssh</module>
<module>netconf-tcp</module>
<module>netconf-monitoring</module>
+ <module>ietf-netconf</module>
<module>ietf-netconf-monitoring</module>
+ <module>ietf-netconf-notifications</module>
<module>ietf-netconf-monitoring-extension</module>
<module>netconf-connector-config</module>
<module>netconf-auth</module>
<module>netconf-usermanager</module>
<module>netconf-testtool</module>
+ <module>netconf-notifications-impl</module>
+ <module>netconf-notifications-api</module>
<module>netconf-artifacts</module>
</modules>
<name>controller</name>
<!-- Used by Sonar to set project name -->
- <prerequisites>
- <maven>3.0</maven>
- </prerequisites>
-
<modules>
<!-- md-sal -->