<groupId>org.opendaylight.controller.thirdparty</groupId>
<artifactId>net.sf.jung2</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.eclipse.persistence</groupId>
+ <artifactId>org.eclipse.persistence.antlr</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.persistence</groupId>
+ <artifactId>org.eclipse.persistence.core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.persistence</groupId>
+ <artifactId>org.eclipse.persistence.moxy</artifactId>
+ </dependency>
</dependencies>
<build>
<resources>
<bundle>mvn:org.opendaylight.controller/flowprogrammer.northbound/${flowprogrammer.northbound.version}</bundle>
<bundle>mvn:org.opendaylight.controller/hosttracker.northbound/${hosttracker.northbound.version}</bundle>
<bundle>mvn:org.opendaylight.controller/networkconfig.bridgedomain.northbound/${networkconfig.bridgedomain.northbound.version}</bundle>
+ <bundle>mvn:org.eclipse.persistence/org.eclipse.persistence.antlr/${eclipse.persistence.version}</bundle>
+ <bundle>mvn:org.eclipse.persistence/org.eclipse.persistence.core/${eclipse.persistence.version}</bundle>
+ <bundle>mvn:org.eclipse.persistence/org.eclipse.persistence.moxy/${eclipse.persistence.version}</bundle>
<bundle>mvn:org.opendaylight.controller/networkconfig.neutron.northbound/${networkconfig.neutron.northbound.version}</bundle>
<bundle>mvn:org.opendaylight.controller/forwarding.staticrouting.northbound/${forwarding.staticrouting.northbound.version}</bundle>
<bundle>mvn:org.opendaylight.controller/statistics.northbound/${statistics.northbound.version}</bundle>
<sonar.language>java</sonar.language>
<sonar.jacoco.reportPath>target/code-coverage/jacoco.exec</sonar.jacoco.reportPath>
<sonar.jacoco.itReportPath>target/code-coverage/jacoco-it.exec</sonar.jacoco.itReportPath>
- <sonar.skippedModules>org.openflow.openflowj,net.sf.jung2,org.opendaylight.controller.protobuff.messages</sonar.skippedModules>
+ <sonar.skippedModules>org.openflow.openflowj,net.sf.jung2,org.opendaylight.controller.protobuff.messages,ch.ethz.ssh2</sonar.skippedModules>
+ <sonar.profile>Sonar way with Findbugs</sonar.profile>
<spifly.version>1.0.0</spifly.version>
<spring-osgi.version>1.2.1</spring-osgi.version>
<spring-security-karaf.version>3.1.4.RELEASE</spring-security-karaf.version>
if (tableIdValidationPrecondition(tableKey, removeDataObj)) {
final RemoveFlowInputBuilder builder = new RemoveFlowInputBuilder(removeDataObj);
builder.setFlowRef(new FlowRef(identifier));
- builder.setNode(new NodeRef(nodeIdent));
+ builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
this.provider.getSalFlowService().removeFlow(builder.build());
if (tableIdValidationPrecondition(tableKey, update)) {
final UpdateFlowInputBuilder builder = new UpdateFlowInputBuilder();
- builder.setNode(new NodeRef(nodeIdent));
+ builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
builder.setFlowRef(new FlowRef(identifier));
builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
builder.setUpdatedFlow((new UpdatedFlowBuilder(update)).build());
if (tableIdValidationPrecondition(tableKey, addDataObj)) {
final AddFlowInputBuilder builder = new AddFlowInputBuilder(addDataObj);
- builder.setNode(new NodeRef(nodeIdent));
+ builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
builder.setFlowRef(new FlowRef(identifier));
builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
final Group group = (removeDataObj);
final RemoveGroupInputBuilder builder = new RemoveGroupInputBuilder(group);
- builder.setNode(new NodeRef(nodeIdent));
+ builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
builder.setGroupRef(new GroupRef(identifier));
builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
this.provider.getSalGroupService().removeGroup(builder.build());
final Group updatedGroup = (update);
final UpdateGroupInputBuilder builder = new UpdateGroupInputBuilder();
- builder.setNode(new NodeRef(nodeIdent));
+ builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
builder.setGroupRef(new GroupRef(identifier));
builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
builder.setUpdatedGroup((new UpdatedGroupBuilder(updatedGroup)).build());
final Group group = (addDataObj);
final AddGroupInputBuilder builder = new AddGroupInputBuilder(group);
- builder.setNode(new NodeRef(nodeIdent));
+ builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
builder.setGroupRef(new GroupRef(identifier));
builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
this.provider.getSalGroupService().addGroup(builder.build());
final RemoveMeterInputBuilder builder = new RemoveMeterInputBuilder(removeDataObj);
- builder.setNode(new NodeRef(nodeIdent));
+ builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
builder.setMeterRef(new MeterRef(identifier));
builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
this.provider.getSalMeterService().removeMeter(builder.build());
final UpdateMeterInputBuilder builder = new UpdateMeterInputBuilder();
- builder.setNode(new NodeRef(nodeIdent));
+ builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
builder.setMeterRef(new MeterRef(identifier));
builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
builder.setUpdatedMeter((new UpdatedMeterBuilder(update)).build());
final AddMeterInputBuilder builder = new AddMeterInputBuilder(addDataObj);
- builder.setNode(new NodeRef(nodeIdent));
+ builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
builder.setMeterRef(new MeterRef(identifier));
builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
this.provider.getSalMeterService().addMeter(builder.build());
*/
public class DefaultConfigParamsImpl implements ConfigParams {
- private static final int SNAPSHOT_BATCH_COUNT = 100000;
+ private static final int SNAPSHOT_BATCH_COUNT = 20000;
/**
* The maximum election time variance
@Override public void onReceiveRecover(Object message) {
if (message instanceof SnapshotOffer) {
- LOG.debug("SnapshotOffer called..");
+ LOG.info("SnapshotOffer called..");
SnapshotOffer offer = (SnapshotOffer) message;
Snapshot snapshot = (Snapshot) offer.snapshot();
context.setReplicatedLog(replicatedLog);
context.setLastApplied(snapshot.getLastAppliedIndex());
- LOG.debug("Applied snapshot to replicatedLog. " +
- "snapshotIndex={}, snapshotTerm={}, journal-size={}",
+ LOG.info("Applied snapshot to replicatedLog. " +
+ "snapshotIndex={}, snapshotTerm={}, journal-size={}",
replicatedLog.snapshotIndex, replicatedLog.snapshotTerm,
- replicatedLog.size());
+ replicatedLog.size()
+ );
// Apply the snapshot to the actors state
applySnapshot(ByteString.copyFrom(snapshot.getState()));
context.removePeer(rrp.getName());
} else if (message instanceof CaptureSnapshot) {
- LOG.debug("CaptureSnapshot received by actor");
+ LOG.info("CaptureSnapshot received by actor");
CaptureSnapshot cs = (CaptureSnapshot)message;
captureSnapshot = cs;
createSnapshot();
} else if (message instanceof CaptureSnapshotReply){
- LOG.debug("CaptureSnapshotReply received by actor");
+ LOG.info("CaptureSnapshotReply received by actor");
CaptureSnapshotReply csr = (CaptureSnapshotReply) message;
ByteString stateInBytes = csr.getSnapshot();
- LOG.debug("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
+ LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
handleCaptureSnapshotReply(stateInBytes);
} else {
package org.opendaylight.controller.cluster.raft.messages;
import com.google.protobuf.ByteString;
-import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
public class InstallSnapshot extends AbstractRaftRPC {
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
-import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: InstallSnapshot.proto
-package org.opendaylight.controller.cluster.raft.protobuff.messages;
+package org.opendaylight.controller.protobuff.messages.cluster.raft;
public final class InstallSnapshotMessages {
private InstallSnapshotMessages() {}
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
- return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+ return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
- return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
+ return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
.ensureFieldAccessorsInitialized(
- org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.Builder.class);
+ org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.Builder.class);
}
public static com.google.protobuf.Parser<InstallSnapshot> PARSER =
if (ref instanceof java.lang.String) {
return (java.lang.String) ref;
} else {
- com.google.protobuf.ByteString bs =
+ com.google.protobuf.ByteString bs =
(com.google.protobuf.ByteString) ref;
java.lang.String s = bs.toStringUtf8();
if (bs.isValidUtf8()) {
getLeaderIdBytes() {
java.lang.Object ref = leaderId_;
if (ref instanceof java.lang.String) {
- com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8(
(java.lang.String) ref);
leaderId_ = b;
return super.writeReplace();
}
- public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+ public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
- public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+ public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
- public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(byte[] data)
+ public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
- public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+ public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
- public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(java.io.InputStream input)
+ public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
- public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+ public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
- public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(java.io.InputStream input)
+ public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input);
}
- public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(
+ public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input, extensionRegistry);
}
- public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+ public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
- public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(
+ public static org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
- public static Builder newBuilder(org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot prototype) {
+ public static Builder newBuilder(org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
*/
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder>
- implements org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshotOrBuilder {
+ implements org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshotOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
- return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+ return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
- return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
+ return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable
.ensureFieldAccessorsInitialized(
- org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.Builder.class);
+ org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.Builder.class);
}
- // Construct using org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.newBuilder()
+ // Construct using org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
- return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
+ return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor;
}
- public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot getDefaultInstanceForType() {
- return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance();
+ public org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot getDefaultInstanceForType() {
+ return org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance();
}
- public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot build() {
- org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot result = buildPartial();
+ public org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot build() {
+ org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
- public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot buildPartial() {
- org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot result = new org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot(this);
+ public org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot buildPartial() {
+ org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot result = new org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
}
public Builder mergeFrom(com.google.protobuf.Message other) {
- if (other instanceof org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot) {
- return mergeFrom((org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot)other);
+ if (other instanceof org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot) {
+ return mergeFrom((org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot)other);
} else {
super.mergeFrom(other);
return this;
}
}
- public Builder mergeFrom(org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot other) {
- if (other == org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance()) return this;
+ public Builder mergeFrom(org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot other) {
+ if (other == org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance()) return this;
if (other.hasTerm()) {
setTerm(other.getTerm());
}
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
- org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parsedMessage = null;
+ org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot parsedMessage = null;
try {
parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
- parsedMessage = (org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot) e.getUnfinishedMessage();
+ parsedMessage = (org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages.InstallSnapshot) e.getUnfinishedMessage();
throw e;
} finally {
if (parsedMessage != null) {
getLeaderIdBytes() {
java.lang.Object ref = leaderId_;
if (ref instanceof String) {
- com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8(
(java.lang.String) ref);
leaderId_ = b;
"\021lastIncludedIndex\030\003 \001(\003\022\030\n\020lastIncluded" +
"Term\030\004 \001(\003\022\014\n\004data\030\005 \001(\014\022\022\n\nchunkIndex\030\006" +
" \001(\005\022\023\n\013totalChunks\030\007 \001(\005BX\n;org.openday" +
- "light.controller.cluster.raft.protobuff." +
- "messagesB\027InstallSnapshotMessagesH\001"
+ "light.controller.protobuff.messages.clus" +
+ "ter.raftB\027InstallSnapshotMessagesH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
package org.opendaylight.controller.cluster.raft;
-option java_package = "org.opendaylight.controller.cluster.raft.protobuff.messages";
+option java_package = "org.opendaylight.controller.protobuff.messages.cluster.raft";
option java_outer_classname = "InstallSnapshotMessages";
option optimize_for = SPEED;
package org.opendaylight.controller.md.sal.common.api.data;
+import com.google.common.base.Supplier;
import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
-
-import com.google.common.base.Function;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
/**
* A type of TransactionCommitFailedException that indicates a situation that would result in a
* @author Thomas Pantelis
*/
public class TransactionCommitDeadlockException extends TransactionCommitFailedException {
-
private static final long serialVersionUID = 1L;
-
private static final String DEADLOCK_MESSAGE =
"An attempt to block on a ListenableFuture via a get method from a write " +
"transaction submit was detected that would result in deadlock. The commit " +
"result must be obtained asynchronously, e.g. via Futures#addCallback, to avoid deadlock.";
+ private static final RpcError DEADLOCK_RPCERROR = RpcResultBuilder.newError(ErrorType.APPLICATION, "lock-denied", DEADLOCK_MESSAGE);
- public static Function<Void, Exception> DEADLOCK_EXECUTOR_FUNCTION = new Function<Void, Exception>() {
+ public static final Supplier<Exception> DEADLOCK_EXCEPTION_SUPPLIER = new Supplier<Exception>() {
@Override
- public Exception apply(Void notUsed) {
- return new TransactionCommitDeadlockException( DEADLOCK_MESSAGE,
- RpcResultBuilder.newError(ErrorType.APPLICATION, "lock-denied", DEADLOCK_MESSAGE));
+ public Exception get() {
+ return new TransactionCommitDeadlockException(DEADLOCK_MESSAGE, DEADLOCK_RPCERROR);
}
};
- public TransactionCommitDeadlockException(String message, final RpcError... errors) {
+ public TransactionCommitDeadlockException(final String message, final RpcError... errors) {
super(message, errors);
}
}
import javax.annotation.Nullable;
import org.opendaylight.yangtools.util.concurrent.CountingRejectedExecutionHandler;
import org.opendaylight.yangtools.util.concurrent.TrackingLinkedBlockingQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* MXBean implementation of the ThreadExecutorStatsMXBean interface that retrieves statistics
*/
public class ThreadExecutorStatsMXBeanImpl extends AbstractMXBean
implements ThreadExecutorStatsMXBean {
-
+ private static final Logger LOG = LoggerFactory.getLogger(ThreadExecutorStatsMXBeanImpl.class);
private final ThreadPoolExecutor executor;
/**
* @param mBeanType Used as the <code>type</code> property in the bean's ObjectName.
* @param mBeanCategory Used as the <code>Category</code> property in the bean's ObjectName.
*/
- public ThreadExecutorStatsMXBeanImpl(Executor executor, String mBeanName,
- String mBeanType, @Nullable String mBeanCategory) {
+ public ThreadExecutorStatsMXBeanImpl(final ThreadPoolExecutor executor, final String mBeanName,
+ final String mBeanType, @Nullable final String mBeanCategory) {
super(mBeanName, mBeanType, mBeanCategory);
+ this.executor = Preconditions.checkNotNull(executor);
+ }
+
+ /**
+ * Create a new bean for the statistics, which is already registered.
+ *
+ * @param executor
+ * @param mBeanName
+ * @param mBeanType
+ * @param mBeanCategory
+ * @return
+ */
+ public static ThreadExecutorStatsMXBeanImpl create(final Executor executor, final String mBeanName,
+ final String mBeanType, @Nullable final String mBeanCategory) {
+ if (executor instanceof ThreadPoolExecutor) {
+ final ThreadExecutorStatsMXBeanImpl ret = new ThreadExecutorStatsMXBeanImpl((ThreadPoolExecutor) executor, mBeanName, mBeanType, mBeanCategory);
+ ret.registerMBean();
+ return ret;
+ }
- Preconditions.checkArgument(executor instanceof ThreadPoolExecutor,
- "The ExecutorService of type {} is not an instanceof ThreadPoolExecutor",
- executor.getClass());
- this.executor = (ThreadPoolExecutor)executor;
+ LOG.info("Executor {} is not supported", executor);
+ return null;
}
@Override
.tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
self());
+ createSnapshotTransaction = null;
// Send a PoisonPill instead of sending close transaction because we do not really need
// a response
getSender().tell(PoisonPill.getInstance(), self());
// Since this will be done only on Recovery or when this actor is a Follower
// we can safely commit everything in here. We not need to worry about event notifications
// as they would have already been disabled on the follower
+
+ LOG.info("Applying snapshot");
try {
DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
syncCommitTransaction(transaction);
} catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
LOG.error(e, "An exception occurred when applying snapshot");
+ } finally {
+ LOG.info("Done applying snapshot");
}
}
}
public void setDataStoreExecutor(ExecutorService dsExecutor) {
- this.dataStoreExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(dsExecutor,
+ this.dataStoreExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(dsExecutor,
"notification-executor", getMBeanType(), getMBeanCategory());
}
this.notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager,
"notification-manager", getMBeanType(), getMBeanCategory());
- this.notificationExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(manager.getExecutor(),
+ this.notificationExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(manager.getExecutor(),
"data-store-executor", getMBeanType(), getMBeanCategory());
}
System.setProperty("shard.persistent", "false");
system = ActorSystem.create("test");
+
+ deletePersistenceFiles();
}
@AfterClass
public static void tearDownClass() throws IOException {
JavaTestKit.shutdownActorSystem(system);
system = null;
+
+ deletePersistenceFiles();
}
protected static void deletePersistenceFiles() throws IOException {
subject.tell(new CaptureSnapshot(-1,-1,-1,-1),
getRef());
- waitForLogMessage(Logging.Debug.class, subject, "CaptureSnapshotReply received by actor");
+ waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
+
+ subject.tell(new CaptureSnapshot(-1,-1,-1,-1),
+ getRef());
+
+ waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
+
}
};
- Thread.sleep(2000);
deletePersistenceFiles();
}};
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import akka.dispatch.Futures;
+import akka.japi.Option;
+import akka.persistence.SelectedSnapshot;
+import akka.persistence.SnapshotMetadata;
+import akka.persistence.SnapshotSelectionCriteria;
+import akka.persistence.snapshot.japi.SnapshotStore;
+import com.google.common.collect.Iterables;
+import scala.concurrent.Future;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class InMemorySnapshotStore extends SnapshotStore {
+
+ Map<String, List<Snapshot>> snapshots = new HashMap<>();
+
+ @Override public Future<Option<SelectedSnapshot>> doLoadAsync(String s,
+ SnapshotSelectionCriteria snapshotSelectionCriteria) {
+ List<Snapshot> snapshotList = snapshots.get(s);
+ if(snapshotList == null){
+ return Futures.successful(Option.<SelectedSnapshot>none());
+ }
+
+ Snapshot snapshot = Iterables.getLast(snapshotList);
+ SelectedSnapshot selectedSnapshot =
+ new SelectedSnapshot(snapshot.getMetadata(), snapshot.getData());
+ return Futures.successful(Option.some(selectedSnapshot));
+ }
+
+ @Override public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) {
+ List<Snapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
+
+ if(snapshotList == null){
+ snapshotList = new ArrayList<>();
+ snapshots.put(snapshotMetadata.persistenceId(), snapshotList);
+ }
+ snapshotList.add(new Snapshot(snapshotMetadata, o));
+
+ return Futures.successful(null);
+ }
+
+ @Override public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception {
+ }
+
+ @Override public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception {
+ List<Snapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
+
+ if(snapshotList == null){
+ return;
+ }
+
+ int deleteIndex = -1;
+
+ for(int i=0;i<snapshotList.size(); i++){
+ Snapshot snapshot = snapshotList.get(i);
+ if(snapshotMetadata.equals(snapshot.getMetadata())){
+ deleteIndex = i;
+ break;
+ }
+ }
+
+ if(deleteIndex != -1){
+ snapshotList.remove(deleteIndex);
+ }
+
+ }
+
+ @Override public void doDelete(String s, SnapshotSelectionCriteria snapshotSelectionCriteria)
+ throws Exception {
+ List<Snapshot> snapshotList = snapshots.get(s);
+
+ if(snapshotList == null){
+ return;
+ }
+
+ // TODO : This is a quick and dirty implementation. Do actual match later.
+ snapshotList.clear();
+ snapshots.remove(s);
+ }
+
+ private static class Snapshot {
+ private final SnapshotMetadata metadata;
+ private final Object data;
+
+ private Snapshot(SnapshotMetadata metadata, Object data) {
+ this.metadata = metadata;
+ this.data = data;
+ }
+
+ public SnapshotMetadata getMetadata() {
+ return metadata;
+ }
+
+ public Object getData() {
+ return data;
+ }
+ }
+}
akka {
+ persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+
loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"]
actor {
}
}
}
+
+in-memory-snapshot-store {
+ # Class name of the plugin.
+ class = "org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore"
+ # Dispatcher for the plugin actor.
+ plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+}
+
bounded-mailbox {
mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
*/
package org.opendaylight.controller.config.yang.md.sal.dom.impl;
+import java.util.EnumMap;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
-
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl;
import org.opendaylight.controller.md.sal.dom.broker.impl.jmx.CommitStatsMXBeanImpl;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
-import com.google.common.collect.ImmutableMap;
/**
*
//we will default to InMemoryDOMDataStore creation
configStore = InMemoryDOMDataStoreFactory.create("DOM-CFG", getSchemaServiceDependency());
}
- ImmutableMap<LogicalDatastoreType, DOMStore> datastores = ImmutableMap
- .<LogicalDatastoreType, DOMStore> builder().put(LogicalDatastoreType.OPERATIONAL, operStore)
- .put(LogicalDatastoreType.CONFIGURATION, configStore).build();
+
+ final Map<LogicalDatastoreType, DOMStore> datastores = new EnumMap<>(LogicalDatastoreType.class);
+ datastores.put(LogicalDatastoreType.OPERATIONAL, operStore);
+ datastores.put(LogicalDatastoreType.CONFIGURATION, configStore);
/*
* We use a single-threaded executor for commits with a bounded queue capacity. If the
DOMDataBrokerImpl newDataBroker = new DOMDataBrokerImpl(datastores,
new DeadlockDetectingListeningExecutorService(commitExecutor,
- TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION,
+ TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER,
listenableFutureExecutor));
final CommitStatsMXBeanImpl commitStatsMXBean = new CommitStatsMXBeanImpl(
newDataBroker.getCommitStatsTracker(), JMX_BEAN_TYPE);
commitStatsMXBean.registerMBean();
- final ThreadExecutorStatsMXBeanImpl commitExecutorStatsMXBean =
- new ThreadExecutorStatsMXBeanImpl(commitExecutor, "CommitExecutorStats",
+ final AbstractMXBean commitExecutorStatsMXBean =
+ ThreadExecutorStatsMXBeanImpl.create(commitExecutor, "CommitExecutorStats",
JMX_BEAN_TYPE, null);
- commitExecutorStatsMXBean.registerMBean();
-
- final ThreadExecutorStatsMXBeanImpl commitFutureStatsMXBean =
- new ThreadExecutorStatsMXBeanImpl(listenableFutureExecutor,
+ final AbstractMXBean commitFutureStatsMXBean =
+ ThreadExecutorStatsMXBeanImpl.create(listenableFutureExecutor,
"CommitFutureExecutorStats", JMX_BEAN_TYPE, null);
- commitFutureStatsMXBean.registerMBean();
newDataBroker.setCloseable(new AutoCloseable() {
@Override
public void close() {
commitStatsMXBean.unregisterMBean();
- commitExecutorStatsMXBean.unregisterMBean();
- commitFutureStatsMXBean.unregisterMBean();
+ if (commitExecutorStatsMXBean != null) {
+ commitExecutorStatsMXBean.unregisterMBean();
+ }
+ if (commitFutureStatsMXBean != null) {
+ commitFutureStatsMXBean.unregisterMBean();
+ }
}
});
package org.opendaylight.controller.md.sal.dom.broker.impl;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
import java.util.EnumMap;
import java.util.Map;
import java.util.Map.Entry;
private volatile int closed = 0;
protected AbstractDOMForwardedTransactionFactory(final Map<LogicalDatastoreType, ? extends T> txFactories) {
- this.storeTxFactories = ImmutableMap.copyOf(txFactories);
+ this.storeTxFactories = new EnumMap<>(txFactories);
}
/**
import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import java.util.EnumMap;
private final AtomicLong chainNum = new AtomicLong();
private volatile AutoCloseable closeable;
- public DOMDataBrokerImpl(final ImmutableMap<LogicalDatastoreType, DOMStore> datastores,
+ public DOMDataBrokerImpl(final Map<LogicalDatastoreType, DOMStore> datastores,
final ListeningExecutorService executor) {
super(datastores);
this.coordinator = new DOMDataCommitCoordinatorImpl(executor);
package org.opendaylight.controller.md.sal.dom.broker.impl;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
-
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ForwardingExecutorService;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ForwardingExecutorService;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
public class DOMBrokerTest {
private SchemaContext schemaContext;
commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB");
executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
- TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION, futureExecutor);
+ TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER, futureExecutor);
domBroker = new DOMDataBrokerImpl(stores, executor);
}
TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
@Override
- public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
+ public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
Futures.addCallback( writeTx.submit(), new FutureCallback<Void>() {
@Override
- public void onSuccess( Void result ) {
+ public void onSuccess( final Void result ) {
commitCompletedLatch.countDown();
}
@Override
- public void onFailure( Throwable t ) {
+ public void onFailure( final Throwable t ) {
caughtCommitEx.set( t );
commitCompletedLatch.countDown();
}
TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
@Override
- public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
+ public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
private final CountDownLatch latch = new CountDownLatch( 1 );
@Override
- public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
+ public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
this.change = change;
latch.countDown();
}
ExecutorService delegate;
- public CommitExecutorService( ExecutorService delegate ) {
+ public CommitExecutorService( final ExecutorService delegate ) {
this.delegate = delegate;
}
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
private final DataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
private final ListenerTree listenerTree = ListenerTree.create();
private final AtomicLong txCounter = new AtomicLong(0);
- private final ListeningExecutorService listeningExecutor;
private final QueuedNotificationManager<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> dataChangeListenerNotificationManager;
private final ExecutorService dataChangeListenerExecutor;
-
- private final ExecutorService domStoreExecutor;
+ private final ListeningExecutorService commitExecutor;
private final boolean debugTransactions;
private final String name;
private volatile AutoCloseable closeable;
- public InMemoryDOMDataStore(final String name, final ExecutorService domStoreExecutor,
+ public InMemoryDOMDataStore(final String name, final ListeningExecutorService commitExecutor,
final ExecutorService dataChangeListenerExecutor) {
- this(name, domStoreExecutor, dataChangeListenerExecutor,
+ this(name, commitExecutor, dataChangeListenerExecutor,
InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE, false);
}
- public InMemoryDOMDataStore(final String name, final ExecutorService domStoreExecutor,
+ public InMemoryDOMDataStore(final String name, final ListeningExecutorService commitExecutor,
final ExecutorService dataChangeListenerExecutor, final int maxDataChangeListenerQueueSize,
final boolean debugTransactions) {
this.name = Preconditions.checkNotNull(name);
- this.domStoreExecutor = Preconditions.checkNotNull(domStoreExecutor);
- this.listeningExecutor = MoreExecutors.listeningDecorator(this.domStoreExecutor);
+ this.commitExecutor = Preconditions.checkNotNull(commitExecutor);
this.dataChangeListenerExecutor = Preconditions.checkNotNull(dataChangeListenerExecutor);
this.debugTransactions = debugTransactions;
}
public ExecutorService getDomStoreExecutor() {
- return domStoreExecutor;
+ return commitExecutor;
}
@Override
@Override
public void close() {
- ExecutorServiceUtil.tryGracefulShutdown(listeningExecutor, 30, TimeUnit.SECONDS);
+ ExecutorServiceUtil.tryGracefulShutdown(commitExecutor, 30, TimeUnit.SECONDS);
ExecutorServiceUtil.tryGracefulShutdown(dataChangeListenerExecutor, 30, TimeUnit.SECONDS);
if(closeable != null) {
@Override
public ListenableFuture<Boolean> canCommit() {
- return listeningExecutor.submit(new Callable<Boolean>() {
+ return commitExecutor.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws TransactionCommitFailedException {
try {
@Override
public ListenableFuture<Void> preCommit() {
- return listeningExecutor.submit(new Callable<Void>() {
+ return commitExecutor.submit(new Callable<Void>() {
@Override
public Void call() {
candidate = dataTree.prepare(modification);
*/
package org.opendaylight.controller.md.sal.dom.store.impl;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
@Nullable final InMemoryDOMDataStoreConfigProperties properties) {
InMemoryDOMDataStoreConfigProperties actualProperties = properties;
- if(actualProperties == null) {
+ if (actualProperties == null) {
actualProperties = InMemoryDOMDataStoreConfigProperties.getDefault();
}
// task execution time to get higher throughput as DataChangeListeners typically provide
// much of the business logic for a data model. If the executor queue size limit is reached,
// subsequent submitted notifications will block the calling thread.
-
int dclExecutorMaxQueueSize = actualProperties.getMaxDataChangeExecutorQueueSize();
int dclExecutorMaxPoolSize = actualProperties.getMaxDataChangeExecutorPoolSize();
ExecutorService dataChangeListenerExecutor = SpecialExecutors.newBlockingBoundedFastThreadPool(
dclExecutorMaxPoolSize, dclExecutorMaxQueueSize, name + "-DCL" );
- ExecutorService domStoreExecutor = SpecialExecutors.newBoundedSingleThreadExecutor(
- actualProperties.getMaxDataStoreExecutorQueueSize(), "DOMStore-" + name );
-
- InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name,
- domStoreExecutor, dataChangeListenerExecutor,
+ final ListeningExecutorService commitExecutor = MoreExecutors.sameThreadExecutor();
+ final InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name,
+ commitExecutor, dataChangeListenerExecutor,
actualProperties.getMaxDataChangeListenerQueueSize(), debugTransactions);
- if(schemaService != null) {
+ if (schemaService != null) {
schemaService.registerSchemaContextListener(dataStore);
}
package org.opendaylight.controller.md.sal.dom.store.impl.jmx;
import java.util.concurrent.ExecutorService;
-
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
import org.opendaylight.controller.md.sal.common.util.jmx.QueuedNotificationManagerMXBeanImpl;
import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
*/
public class InMemoryDataStoreStats implements AutoCloseable {
- private final ThreadExecutorStatsMXBeanImpl notificationExecutorStatsBean;
- private final ThreadExecutorStatsMXBeanImpl dataStoreExecutorStatsBean;
+ private final AbstractMXBean notificationExecutorStatsBean;
+ private final AbstractMXBean dataStoreExecutorStatsBean;
private final QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean;
- public InMemoryDataStoreStats(String mBeanType, QueuedNotificationManager<?, ?> manager,
- ExecutorService dataStoreExecutor) {
+ public InMemoryDataStoreStats(final String mBeanType, final QueuedNotificationManager<?, ?> manager,
+ final ExecutorService dataStoreExecutor) {
- this.notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager,
+ notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager,
"notification-manager", mBeanType, null);
notificationManagerStatsBean.registerMBean();
- this.notificationExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(manager.getExecutor(),
+ notificationExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(manager.getExecutor(),
"notification-executor", mBeanType, null);
- this.notificationExecutorStatsBean.registerMBean();
+ if (notificationExecutorStatsBean != null) {
+ notificationExecutorStatsBean.registerMBean();
+ }
- this.dataStoreExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(dataStoreExecutor,
+ dataStoreExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(dataStoreExecutor,
"data-store-executor", mBeanType, null);
- this.dataStoreExecutorStatsBean.registerMBean();
+ if (dataStoreExecutorStatsBean != null) {
+ dataStoreExecutorStatsBean.registerMBean();
+ }
}
@Override
<groupId>${project.groupId}</groupId>
<artifactId>netconf-util</artifactId>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>netconf-util</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>mockito-configuration</artifactId>
+ </dependency>
</dependencies>
<build>
package org.opendaylight.controller.netconf.client;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.netty.channel.Channel;
logger.debug("Netconf session {} should use exi.", session);
NetconfStartExiMessage startExiMessage = (NetconfStartExiMessage) sessionPreferences.getStartExiMessage();
tryToInitiateExi(session, startExiMessage);
- // Exi is not supported, release session immediately
} else {
+ // Exi is not supported, release session immediately
logger.debug("Netconf session {} isn't capable of using exi.", session);
negotiationSuccessful(session);
}
private long extractSessionId(final Document doc) {
final Node sessionIdNode = (Node) XmlUtil.evaluateXPath(sessionIdXPath, doc, XPathConstants.NODE);
+ Preconditions.checkState(sessionIdNode != null, "");
String textContent = sessionIdNode.getTextContent();
if (textContent == null || textContent.equals("")) {
throw new IllegalStateException("Session id not received from server");
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.client;
+
+import com.google.common.base.Optional;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
+import org.opendaylight.controller.netconf.client.SimpleNetconfClientSessionListener;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
+import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.opendaylight.protocol.framework.ReconnectStrategy;
+
+import java.net.InetSocketAddress;
+
+public class NetconfClientConfigurationTest {
+ @Test
+ public void testNetconfClientConfiguration() throws Exception {
+ Long timeout = 200L;
+ NetconfHelloMessageAdditionalHeader header = new NetconfHelloMessageAdditionalHeader("a", "host", "port", "trans", "id");
+ NetconfClientSessionListener listener = new SimpleNetconfClientSessionListener();
+ InetSocketAddress address = InetSocketAddress.createUnresolved("host", 830);
+ ReconnectStrategy strategy = Mockito.mock(ReconnectStrategy.class);
+ AuthenticationHandler handler = Mockito.mock(AuthenticationHandler.class);
+ NetconfClientConfiguration cfg = NetconfClientConfigurationBuilder.create().
+ withProtocol(NetconfClientConfiguration.NetconfClientProtocol.SSH).
+ withAddress(address).
+ withConnectionTimeoutMillis(timeout).
+ withReconnectStrategy(strategy).
+ withAdditionalHeader(header).
+ withSessionListener(listener).
+ withAuthHandler(handler).build();
+
+ Assert.assertEquals(timeout, cfg.getConnectionTimeoutMillis());
+ Assert.assertEquals(Optional.fromNullable(header), cfg.getAdditionalHeader());
+ Assert.assertEquals(listener, cfg.getSessionListener());
+ Assert.assertEquals(handler, cfg.getAuthHandler());
+ Assert.assertEquals(strategy, cfg.getReconnectStrategy());
+ Assert.assertEquals(NetconfClientConfiguration.NetconfClientProtocol.SSH, cfg.getProtocol());
+ Assert.assertEquals(address, cfg.getAddress());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.client;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
+import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
+import org.opendaylight.controller.netconf.client.SimpleNetconfClientSessionListener;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
+import org.opendaylight.controller.netconf.client.conf.NetconfReconnectingClientConfiguration;
+import org.opendaylight.controller.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
+import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+
+public class NetconfClientDispatcherImplTest {
+ @Test
+ public void testNetconfClientDispatcherImpl() throws Exception {
+ EventLoopGroup bossGroup = Mockito.mock(EventLoopGroup.class);
+ EventLoopGroup workerGroup = Mockito.mock(EventLoopGroup.class);
+ Timer timer = new HashedWheelTimer();
+
+ ChannelFuture chf = Mockito.mock(ChannelFuture.class);
+ Channel ch = Mockito.mock(Channel.class);
+ doReturn(ch).when(chf).channel();
+ Throwable thr = Mockito.mock(Throwable.class);
+ doReturn(chf).when(workerGroup).register(any(Channel.class));
+
+ ChannelPromise promise = Mockito.mock(ChannelPromise.class);
+ doReturn(promise).when(chf).addListener(any(GenericFutureListener.class));
+ doReturn(thr).when(chf).cause();
+
+ Long timeout = 200L;
+ NetconfHelloMessageAdditionalHeader header = new NetconfHelloMessageAdditionalHeader("a", "host", "port", "trans", "id");
+ NetconfClientSessionListener listener = new SimpleNetconfClientSessionListener();
+ InetSocketAddress address = InetSocketAddress.createUnresolved("host", 830);
+ ReconnectStrategyFactory reconnectStrategyFactory = Mockito.mock(ReconnectStrategyFactory.class);
+ AuthenticationHandler handler = Mockito.mock(AuthenticationHandler.class);
+ ReconnectStrategy reconnect = Mockito.mock(ReconnectStrategy.class);
+
+ doReturn(5).when(reconnect).getConnectTimeout();
+ doReturn("").when(reconnect).toString();
+ doReturn("").when(handler).toString();
+ doReturn("").when(reconnectStrategyFactory).toString();
+ doReturn(reconnect).when(reconnectStrategyFactory).createReconnectStrategy();
+
+ NetconfReconnectingClientConfiguration cfg = NetconfReconnectingClientConfigurationBuilder.create().
+ withProtocol(NetconfClientConfiguration.NetconfClientProtocol.SSH).
+ withAddress(address).
+ withConnectionTimeoutMillis(timeout).
+ withReconnectStrategy(reconnect).
+ withAdditionalHeader(header).
+ withSessionListener(listener).
+ withConnectStrategyFactory(reconnectStrategyFactory).
+ withAuthHandler(handler).build();
+
+ NetconfReconnectingClientConfiguration cfg2 = NetconfReconnectingClientConfigurationBuilder.create().
+ withProtocol(NetconfClientConfiguration.NetconfClientProtocol.TCP).
+ withAddress(address).
+ withConnectionTimeoutMillis(timeout).
+ withReconnectStrategy(reconnect).
+ withAdditionalHeader(header).
+ withSessionListener(listener).
+ withConnectStrategyFactory(reconnectStrategyFactory).
+ withAuthHandler(handler).build();
+
+ NetconfClientDispatcherImpl dispatcher = new NetconfClientDispatcherImpl(bossGroup, workerGroup, timer);
+ Future<NetconfClientSession> sshSession = dispatcher.createClient(cfg);
+ Future<NetconfClientSession> tcpSession = dispatcher.createClient(cfg2);
+
+ Future<Void> sshReconn = dispatcher.createReconnectingClient(cfg);
+ Future<Void> tcpReconn = dispatcher.createReconnectingClient(cfg2);
+
+ assertNotNull(sshSession);
+ assertNotNull(tcpSession);
+ assertNotNull(sshReconn);
+ assertNotNull(tcpReconn);
+
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.client;
+
+import com.google.common.base.Optional;
+import io.netty.channel.Channel;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
+import io.netty.util.concurrent.Promise;
+import org.apache.sshd.common.SessionListener;
+import org.junit.Test;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.opendaylight.protocol.framework.SessionListenerFactory;
+import org.opendaylight.protocol.framework.SessionNegotiator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+public class NetconfClientSessionNegotiatorFactoryTest {
+ @Test
+ public void testGetSessionNegotiator() throws Exception {
+ NetconfClientSessionListener sessionListener = mock(NetconfClientSessionListener.class);
+ Timer timer = new HashedWheelTimer();
+ SessionListenerFactory listenerFactory = mock(SessionListenerFactory.class);
+ doReturn(sessionListener).when(listenerFactory).getSessionListener();
+
+ Channel channel = mock(Channel.class);
+ Promise promise = mock(Promise.class);
+ NetconfClientSessionNegotiatorFactory negotiatorFactory = new NetconfClientSessionNegotiatorFactory(timer,
+ Optional.<NetconfHelloMessageAdditionalHeader>absent(), 200L);
+
+ SessionNegotiator sessionNegotiator = negotiatorFactory.getSessionNegotiator(listenerFactory, channel, promise);
+ assertNotNull(sessionNegotiator);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.client;
+
+import com.google.common.base.Optional;
+import io.netty.channel.*;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.Promise;
+import org.apache.mina.handler.demux.ExceptionHandler;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.collections.Sets;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.netconf.api.NetconfClientSessionPreferences;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import io.netty.util.Timer;
+import org.opendaylight.controller.netconf.nettyutil.handler.ChunkedFramingMechanismEncoder;
+import org.opendaylight.controller.netconf.nettyutil.handler.NetconfXMLToHelloMessageDecoder;
+import org.opendaylight.controller.netconf.nettyutil.handler.NetconfXMLToMessageDecoder;
+import org.opendaylight.controller.netconf.nettyutil.handler.exi.NetconfStartExiMessage;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.openexi.proc.common.EXIOptions;
+import org.w3c.dom.Document;
+import java.util.Set;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.*;
+
+public class NetconfClientSessionNegotiatorTest {
+
+ private NetconfHelloMessage helloMessage;
+ private ChannelPipeline pipeline;
+ private ChannelFuture future;
+ private Channel channel;
+ private ChannelInboundHandlerAdapter channelInboundHandlerAdapter;
+
+ @Before
+ public void setUp() throws Exception {
+ helloMessage = NetconfHelloMessage.createClientHello(Sets.newSet("exi:1.0"), Optional.<NetconfHelloMessageAdditionalHeader>absent());
+ pipeline = mockChannelPipeline();
+ future = mockChannelFuture();
+ channel = mockChannel();
+ System.out.println("setup done");
+ }
+
+ private ChannelHandler mockChannelHandler() {
+ ChannelHandler handler = mock(ChannelHandler.class);
+ return handler;
+ }
+
+ private Channel mockChannel() {
+ Channel channel = mock(Channel.class);
+ ChannelHandler channelHandler = mockChannelHandler();
+ doReturn("").when(channel).toString();
+ doReturn(future).when(channel).close();
+ doReturn(future).when(channel).writeAndFlush(anyObject());
+ doReturn(true).when(channel).isOpen();
+ doReturn(pipeline).when(channel).pipeline();
+ doReturn("").when(pipeline).toString();
+ doReturn(pipeline).when(pipeline).remove(any(ChannelHandler.class));
+ doReturn(channelHandler).when(pipeline).remove(anyString());
+ return channel;
+ }
+
+ private ChannelFuture mockChannelFuture() {
+ ChannelFuture future = mock(ChannelFuture.class);
+ doReturn(future).when(future).addListener(any(GenericFutureListener.class));
+ return future;
+ }
+
+ private ChannelPipeline mockChannelPipeline() {
+ ChannelPipeline pipeline = mock(ChannelPipeline.class);
+ ChannelHandler handler = mock(ChannelHandler.class);
+ doReturn(pipeline).when(pipeline).addAfter(anyString(), anyString(), any(ChannelHandler.class));
+ doReturn(null).when(pipeline).get(SslHandler.class);
+ doReturn(pipeline).when(pipeline).addLast(anyString(), any(ChannelHandler.class));
+ doReturn(handler).when(pipeline).replace(anyString(), anyString(), any(ChunkedFramingMechanismEncoder.class));
+
+ NetconfXMLToHelloMessageDecoder messageDecoder = new NetconfXMLToHelloMessageDecoder();
+ doReturn(messageDecoder).when(pipeline).replace(anyString(), anyString(), any(NetconfXMLToMessageDecoder.class));
+ doReturn(pipeline).when(pipeline).replace(any(ChannelHandler.class), anyString(), any(NetconfClientSession.class));
+ return pipeline;
+ }
+
+ private NetconfClientSessionNegotiator createNetconfClientSessionNegotiator(Promise promise,
+ NetconfMessage startExi) {
+ ChannelProgressivePromise progressivePromise = mock(ChannelProgressivePromise.class);
+ NetconfClientSessionPreferences preferences = new NetconfClientSessionPreferences(helloMessage, startExi);
+ doReturn(progressivePromise).when(promise).setFailure(any(Throwable.class));
+
+ long timeout = 10L;
+ NetconfClientSessionListener sessionListener = mock(NetconfClientSessionListener.class);
+ Timer timer = new HashedWheelTimer();
+ return new NetconfClientSessionNegotiator(preferences, promise, channel, timer, sessionListener, timeout);
+ }
+
+ @Test
+ public void testNetconfClientSessionNegotiator() throws Exception {
+ Promise promise = mock(Promise.class);
+ doReturn(promise).when(promise).setSuccess(anyObject());
+ NetconfClientSessionNegotiator negotiator = createNetconfClientSessionNegotiator(promise, null);
+
+ negotiator.channelActive(null);
+ Set caps = Sets.newSet("a", "b");
+ NetconfHelloMessage helloServerMessage = NetconfHelloMessage.createServerHello(caps, 10);
+ negotiator.handleMessage(helloServerMessage);
+ verify(promise).setSuccess(anyObject());
+ }
+
+ @Test
+ public void testNetconfClientSessionNegotiatorWithEXI() throws Exception {
+ Promise promise = mock(Promise.class);
+ EXIOptions exiOptions = new EXIOptions();
+ NetconfStartExiMessage exiMessage = NetconfStartExiMessage.create(exiOptions, "msg-id");
+ doReturn(promise).when(promise).setSuccess(anyObject());
+ NetconfClientSessionNegotiator negotiator = createNetconfClientSessionNegotiator(promise, exiMessage);
+
+ negotiator.channelActive(null);
+ Set caps = Sets.newSet("exi:1.0");
+ NetconfHelloMessage helloMessage = NetconfHelloMessage.createServerHello(caps, 10);
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ channelInboundHandlerAdapter = ((ChannelInboundHandlerAdapter) invocationOnMock.getArguments()[2]);
+ return null;
+ }
+ }).when(pipeline).addAfter(anyString(), anyString(), any(ChannelHandler.class));
+
+ ChannelHandlerContext handlerContext = mock(ChannelHandlerContext.class);
+ doReturn(pipeline).when(handlerContext).pipeline();
+ negotiator.handleMessage(helloMessage);
+ Document expectedResult = XmlFileLoader.xmlFileToDocument("netconfMessages/rpc-reply_ok.xml");
+ channelInboundHandlerAdapter.channelRead(handlerContext, new NetconfMessage(expectedResult));
+
+ verify(promise).setSuccess(anyObject());
+
+ // two calls for exiMessage, 2 for hello message
+ verify(pipeline, times(4)).replace(anyString(), anyString(), any(ChannelHandler.class));
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.client;
+
+import com.google.common.collect.Lists;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelPipeline;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.netconf.client.NetconfClientSession;
+import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
+import org.opendaylight.controller.netconf.nettyutil.handler.NetconfEXICodec;
+import org.openexi.proc.common.EXIOptions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+
+public class NetconfClientSessionTest {
+
+ @Mock
+ ChannelHandler channelHandler;
+
+ @Mock
+ Channel channel;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testNetconfClientSession() throws Exception {
+ NetconfClientSessionListener sessionListener = mock(NetconfClientSessionListener.class);
+ long sessId = 20L;
+ Collection<String> caps = Lists.newArrayList("cap1", "cap2");
+
+ NetconfEXICodec codec = new NetconfEXICodec(new EXIOptions());
+ ChannelPipeline pipeline = mock(ChannelPipeline.class);
+
+ Mockito.doReturn(pipeline).when(channel).pipeline();
+ Mockito.doReturn(channelHandler).when(pipeline).replace(anyString(), anyString(), any(ChannelHandler.class));
+ Mockito.doReturn("").when(channelHandler).toString();
+
+ NetconfClientSession session = new NetconfClientSession(sessionListener, channel, sessId, caps);
+ session.addExiHandlers(codec);
+ session.stopExiCommunication();
+
+ assertEquals(caps, session.getServerCapabilities());
+ assertEquals(session, session.thisInstance());
+
+ Mockito.verify(pipeline, Mockito.times(4)).replace(anyString(), anyString(), Mockito.any(ChannelHandler.class));
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.client;
+
+import com.google.common.base.Optional;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.config.yang.protocol.framework.NeverReconnectStrategyFactoryModule;
+import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
+import org.opendaylight.controller.netconf.client.SimpleNetconfClientSessionListener;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
+import org.opendaylight.controller.netconf.client.conf.NetconfReconnectingClientConfiguration;
+import org.opendaylight.controller.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
+import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
+
+import java.net.InetSocketAddress;
+
+public class NetconfReconnectingClientConfigurationTest {
+ @Test
+ public void testNetconfReconnectingClientConfiguration() throws Exception {
+ Long timeout = 200L;
+ NetconfHelloMessageAdditionalHeader header = new NetconfHelloMessageAdditionalHeader("a", "host", "port", "trans", "id");
+ NetconfClientSessionListener listener = new SimpleNetconfClientSessionListener();
+ InetSocketAddress address = InetSocketAddress.createUnresolved("host", 830);
+ ReconnectStrategyFactory strategy = Mockito.mock(ReconnectStrategyFactory.class);
+ AuthenticationHandler handler = Mockito.mock(AuthenticationHandler.class);
+ ReconnectStrategy reconnect = Mockito.mock(ReconnectStrategy.class);
+
+ NetconfReconnectingClientConfiguration cfg = NetconfReconnectingClientConfigurationBuilder.create().
+ withProtocol(NetconfClientConfiguration.NetconfClientProtocol.SSH).
+ withAddress(address).
+ withConnectionTimeoutMillis(timeout).
+ withReconnectStrategy(reconnect).
+ withAdditionalHeader(header).
+ withSessionListener(listener).
+ withConnectStrategyFactory(strategy).
+ withAuthHandler(handler).build();
+
+ Assert.assertEquals(timeout, cfg.getConnectionTimeoutMillis());
+ Assert.assertEquals(Optional.fromNullable(header), cfg.getAdditionalHeader());
+ Assert.assertEquals(listener, cfg.getSessionListener());
+ Assert.assertEquals(handler, cfg.getAuthHandler());
+ Assert.assertEquals(strategy, cfg.getConnectStrategyFactory());
+ Assert.assertEquals(NetconfClientConfiguration.NetconfClientProtocol.SSH, cfg.getProtocol());
+ Assert.assertEquals(address, cfg.getAddress());
+ Assert.assertEquals(reconnect, cfg.getReconnectStrategy());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.client;
+import io.netty.channel.*;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.Promise;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.collections.Sets;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
+
+import java.util.Set;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.*;
+
+public class SimpleNetconfClientSessionListenerTest {
+
+ private Channel channel;
+ private ChannelFuture channelFuture;
+ Set caps;
+ private NetconfHelloMessage helloMessage;
+ private NetconfMessage message;
+ private NetconfClientSessionListener sessionListener;
+ private NetconfClientSession clientSession;
+
+ @Before
+ public void setUp() throws Exception {
+ channel = mock(Channel.class);
+ channelFuture = mock(ChannelFuture.class);
+ doReturn(channelFuture).when(channel).writeAndFlush(anyObject());
+ caps = Sets.newSet("a", "b");
+ helloMessage = NetconfHelloMessage.createServerHello(caps, 10);
+ message = new NetconfMessage(helloMessage.getDocument());
+ sessionListener = mock(NetconfClientSessionListener.class);
+ clientSession = new NetconfClientSession(sessionListener, channel, 20L, caps);
+ }
+
+ @Test
+ public void testSessionDown() throws Exception {
+ SimpleNetconfClientSessionListener simpleListener = new SimpleNetconfClientSessionListener();
+ Future<NetconfMessage> promise = simpleListener.sendRequest(message);
+ simpleListener.onSessionUp(clientSession);
+ verify(channel, times(1)).writeAndFlush(anyObject());
+
+ simpleListener.onSessionDown(clientSession, new Exception());
+ assertFalse(promise.isSuccess());
+ }
+
+ @Test
+ public void testSendRequest() throws Exception {
+ SimpleNetconfClientSessionListener simpleListener = new SimpleNetconfClientSessionListener();
+ Future<NetconfMessage> promise = simpleListener.sendRequest(message);
+ simpleListener.onSessionUp(clientSession);
+ verify(channel, times(1)).writeAndFlush(anyObject());
+
+ simpleListener.sendRequest(message);
+ assertFalse(promise.isSuccess());
+ }
+
+ @Test
+ public void testOnMessage() throws Exception {
+ SimpleNetconfClientSessionListener simpleListener = new SimpleNetconfClientSessionListener();
+ Future<NetconfMessage> promise = simpleListener.sendRequest(message);
+ simpleListener.onSessionUp(clientSession);
+ verify(channel, times(1)).writeAndFlush(anyObject());
+
+ simpleListener.onMessage(clientSession, message);
+ assertTrue(promise.isSuccess());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.client;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelPipeline;
+import io.netty.util.concurrent.Promise;
+import org.junit.Test;
+import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
+import org.opendaylight.protocol.framework.SessionListenerFactory;
+import org.opendaylight.protocol.framework.SessionNegotiator;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.*;
+
+public class SshClientChannelInitializerTest {
+ @Test
+ public void test() throws Exception {
+
+ AuthenticationHandler authenticationHandler = mock(AuthenticationHandler.class);
+ NetconfClientSessionNegotiatorFactory negotiatorFactory = mock(NetconfClientSessionNegotiatorFactory.class);
+ NetconfClientSessionListener sessionListener = mock(NetconfClientSessionListener.class);
+
+ SessionNegotiator sessionNegotiator = mock(SessionNegotiator.class);
+ doReturn("").when(sessionNegotiator).toString();
+ doReturn(sessionNegotiator).when(negotiatorFactory).getSessionNegotiator(any(SessionListenerFactory.class), any(Channel.class), any(Promise.class));
+ ChannelPipeline pipeline = mock(ChannelPipeline.class);
+ doReturn(pipeline).when(pipeline).addAfter(anyString(), anyString(), any(ChannelHandler.class));
+ Channel channel = mock(Channel.class);
+ doReturn(pipeline).when(channel).pipeline();
+ doReturn("").when(channel).toString();
+ doReturn(pipeline).when(pipeline).addFirst(any(ChannelHandler.class));
+ doReturn(pipeline).when(pipeline).addLast(anyString(), any(ChannelHandler.class));
+
+ Promise<NetconfClientSession> promise = mock(Promise.class);
+ doReturn("").when(promise).toString();
+
+ SshClientChannelInitializer initializer = new SshClientChannelInitializer(authenticationHandler, negotiatorFactory,
+ sessionListener);
+ initializer.initialize(channel, promise);
+ verify(pipeline, times(1)).addFirst(any(ChannelHandler.class));
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.client;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelPipeline;
+import io.netty.util.concurrent.Promise;
+import org.junit.Test;
+import org.opendaylight.controller.netconf.nettyutil.AbstractChannelInitializer;
+import org.opendaylight.protocol.framework.SessionListenerFactory;
+import org.opendaylight.protocol.framework.SessionNegotiator;
+
+import static org.mockito.Mockito.*;
+
+public class TcpClientChannelInitializerTest {
+ @Test
+ public void testInitializeSessionNegotiator() throws Exception {
+ NetconfClientSessionNegotiatorFactory factory = mock(NetconfClientSessionNegotiatorFactory.class);
+ SessionNegotiator sessionNegotiator = mock(SessionNegotiator.class);
+ doReturn("").when(sessionNegotiator).toString();
+ doReturn(sessionNegotiator).when(factory).getSessionNegotiator(any(SessionListenerFactory.class), any(Channel.class), any(Promise.class));
+ NetconfClientSessionListener listener = mock(NetconfClientSessionListener.class);
+ TcpClientChannelInitializer initializer = new TcpClientChannelInitializer(factory, listener);
+ ChannelPipeline pipeline = mock(ChannelPipeline.class);
+ doReturn(pipeline).when(pipeline).addAfter(anyString(), anyString(), any(ChannelHandler.class));
+ Channel channel = mock(Channel.class);
+ doReturn(pipeline).when(channel).pipeline();
+ doReturn("").when(channel).toString();
+
+ Promise<NetconfClientSession> promise = mock(Promise.class);
+ doReturn("").when(promise).toString();
+
+ initializer.initializeSessionNegotiator(channel, promise);
+ verify(pipeline, times(1)).addAfter(anyString(), anyString(), any(ChannelHandler.class));
+ }
+}
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.netconf.client.test;
+package org.opendaylight.controller.netconf.client;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
-import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
-import org.opendaylight.controller.netconf.client.NetconfClientSession;
-import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
-import org.opendaylight.controller.netconf.client.SimpleNetconfClientSessionListener;
import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration.NetconfClientProtocol;
import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
import org.opendaylight.controller.netconf.client.SimpleNetconfClientSessionListener;
import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
-import org.opendaylight.controller.netconf.client.test.TestingNetconfClient;
+import org.opendaylight.controller.netconf.client.TestingNetconfClient;
import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
import org.opendaylight.controller.netconf.mapping.api.Capability;
import org.opendaylight.controller.config.persist.api.Persister;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.api.jmx.CommitJMXNotification;
-import org.opendaylight.controller.netconf.client.test.TestingNetconfClient;
+import org.opendaylight.controller.netconf.client.TestingNetconfClient;
import org.opendaylight.controller.netconf.impl.DefaultCommitNotificationProducer;
import org.opendaylight.controller.netconf.impl.osgi.NetconfMonitoringServiceImpl;
import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceSnapshotImpl;
import org.junit.Test;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.api.monitoring.NetconfManagementSession;
-import org.opendaylight.controller.netconf.client.test.TestingNetconfClient;
+import org.opendaylight.controller.netconf.client.TestingNetconfClient;
import org.opendaylight.controller.netconf.impl.osgi.NetconfMonitoringServiceImpl;
import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceSnapshotImpl;
import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
import org.opendaylight.controller.netconf.client.SimpleNetconfClientSessionListener;
import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
-import org.opendaylight.controller.netconf.client.test.TestingNetconfClient;
+import org.opendaylight.controller.netconf.client.TestingNetconfClient;
import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
import org.opendaylight.controller.netconf.ssh.NetconfSSHServer;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
-import org.opendaylight.controller.netconf.client.test.TestingNetconfClient;
+import org.opendaylight.controller.netconf.client.TestingNetconfClient;
import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
public class JaxBSerializerTest {
@Test
- public void testName() throws Exception {
+ public void testSerialization() throws Exception {
final NetconfMonitoringService service = new NetconfMonitoringService() {
}
};
final NetconfState model = new NetconfState(service);
- final String xml = XmlUtil.toString(new JaxBSerializer().toXml(model));
+ final String xml = XmlUtil.toString(new JaxBSerializer().toXml(model)).replaceAll("\\s", "");
assertThat(xml, CoreMatchers.containsString(
- "<schema>\n" +
- "<format>yang</format>\n" +
- "<identifier>id</identifier>\n" +
- "<location>NETCONF</location>\n" +
- "<namespace>localhost</namespace>\n" +
- "<version>v1</version>\n" +
- "</schema>\n"));
+ "<schema>" +
+ "<format>yang</format>" +
+ "<identifier>id</identifier>" +
+ "<location>NETCONF</location>" +
+ "<namespace>localhost</namespace>" +
+ "<version>v1</version>" +
+ "</schema>"));
assertThat(xml, CoreMatchers.containsString(
- "<session>\n" +
- "<session-id>1</session-id>\n" +
- "<in-bad-rpcs>0</in-bad-rpcs>\n" +
- "<in-rpcs>0</in-rpcs>\n" +
- "<login-time>loginTime</login-time>\n" +
- "<out-notifications>0</out-notifications>\n" +
- "<out-rpc-errors>0</out-rpc-errors>\n" +
- "<ncme:session-identifier>client</ncme:session-identifier>\n" +
- "<source-host>address/port</source-host>\n" +
- "<transport>ncme:netconf-tcp</transport>\n" +
- "<username>username</username>\n" +
+ "<session>" +
+ "<session-id>1</session-id>" +
+ "<in-bad-rpcs>0</in-bad-rpcs>" +
+ "<in-rpcs>0</in-rpcs>" +
+ "<login-time>loginTime</login-time>" +
+ "<out-notifications>0</out-notifications>" +
+ "<out-rpc-errors>0</out-rpc-errors>" +
+ "<ncme:session-identifier>client</ncme:session-identifier>" +
+ "<source-host>address/port</source-host>" +
+ "<transport>ncme:netconf-tcp</transport>" +
+ "<username>username</username>" +
"</session>"));
}
assertThat(out.get(0), CoreMatchers.instanceOf(NetconfHelloMessage.class));
final NetconfHelloMessage hello = (NetconfHelloMessage) out.get(0);
assertTrue(hello.getAdditionalHeader().isPresent());
- assertEquals("[tomas;10.0.0.0:10000;tcp;client;]\n", hello.getAdditionalHeader().get().toFormattedString());
+ assertEquals("[tomas;10.0.0.0:10000;tcp;client;]" + System.lineSeparator(), hello.getAdditionalHeader().get().toFormattedString());
assertThat(XmlUtil.toString(hello.getDocument()), CoreMatchers.containsString("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\""));
}
netconfSSHServer.setAuthProvider(authProvider);
InetSocketAddress address = netconfSSHServer.getLocalSocketAddress();
- final EchoClientHandler echoClientHandler = connectClient(address);
+
+ final EchoClientHandler echoClientHandler = connectClient(new InetSocketAddress("localhost", address.getPort()));
+
Stopwatch stopwatch = new Stopwatch().start();
while(echoClientHandler.isConnected() == false && stopwatch.elapsed(TimeUnit.SECONDS) < 5) {
Thread.sleep(100);
Document doc = XmlUtil.newDocument();
Element helloElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0,
HELLO_TAG);
- Element capabilitiesElement = doc.createElement(XmlNetconfConstants.CAPABILITIES);
+ Element capabilitiesElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0,
+ XmlNetconfConstants.CAPABILITIES);
for (String capability : Sets.newHashSet(capabilities)) {
- Element capElement = doc.createElement(XmlNetconfConstants.CAPABILITY);
+ Element capElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0,
+ XmlNetconfConstants.CAPABILITY);
capElement.setTextContent(capability);
capabilitiesElement.appendChild(capElement);
}
public static NetconfHelloMessage createServerHello(Set<String> capabilities, long sessionId) throws NetconfDocumentedException {
Document doc = createHelloMessageDoc(capabilities);
- Element sessionIdElement = doc.createElement(XmlNetconfConstants.SESSION_ID);
+ Element sessionIdElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0,
+ XmlNetconfConstants.SESSION_ID);
sessionIdElement.setTextContent(Long.toString(sessionId));
doc.getDocumentElement().appendChild(sessionIdElement);
return new NetconfHelloMessage(doc);