package org.opendaylight.controller.sal.compatibility;
+import java.util.List;
+
import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
import org.opendaylight.controller.sal.inventory.IPluginOutInventoryService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-
public class InventoryNotificationProvider implements AutoCloseable{
private ListenerRegistration<DataChangeListener> nodeConnectorDataChangeListenerRegistration;
&& inventoryPublisher!= null){
if(nodeConnectorDataChangeListener == null){
- InstanceIdentifier nodeConnectorPath = InstanceIdentifier.builder(Nodes.class).child(Node.class).child(NodeConnector.class).build();
+ InstanceIdentifier<NodeConnector> nodeConnectorPath = InstanceIdentifier.builder(Nodes.class).child(Node.class).child(NodeConnector.class).build();
nodeConnectorDataChangeListener = new NodeConnectorDataChangeListener();
nodeConnectorDataChangeListener.setInventoryPublisher(inventoryPublisher);
nodeConnectorDataChangeListenerRegistration = dataProviderService.registerDataChangeListener(nodeConnectorPath, nodeConnectorDataChangeListener);
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import javassist.ClassPool;
CLASS_POOL);
public static final RuntimeCodeGenerator RPC_GENERATOR = RPC_GENERATOR_IMPL;
public static final NotificationInvokerFactory INVOKER_FACTORY = RPC_GENERATOR_IMPL.getInvokerFactory();
+
+ public static final int CORE_NOTIFICATION_THREADS = 4;
+ public static final int MAX_NOTIFICATION_THREADS = 32;
+ public static final int NOTIFICATION_THREAD_LIFE = 15;
+
private static ListeningExecutorService NOTIFICATION_EXECUTOR = null;
private static ListeningExecutorService COMMIT_EXECUTOR = null;
private static ListeningExecutorService CHANGE_EVENT_EXECUTOR = null;
+ /**
+ * @deprecated This method is only used from configuration modules and thus callers of it
+ * should use service injection to make the executor configurable.
+ */
+ @Deprecated
public static synchronized final ListeningExecutorService getDefaultNotificationExecutor() {
if (NOTIFICATION_EXECUTOR == null) {
- NOTIFICATION_EXECUTOR = createNamedExecutor("md-sal-binding-notification-%d");
+ ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("md-sal-binding-notification-%d").build();
+ ExecutorService executor = new ThreadPoolExecutor(CORE_NOTIFICATION_THREADS, MAX_NOTIFICATION_THREADS,
+ NOTIFICATION_THREAD_LIFE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory);
+ NOTIFICATION_EXECUTOR = MoreExecutors.listeningDecorator(executor);
}
return NOTIFICATION_EXECUTOR;
}
public static synchronized final ListeningExecutorService getDefaultCommitExecutor() {
if (COMMIT_EXECUTOR == null) {
ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("md-sal-binding-commit-%d").build();
- /*
- * FIXME: this used to be newCacheThreadPool(), but MD-SAL does not have transaction
- * ordering guarantees, which means that using a concurrent threadpool results
- * in application data being committed in random order, potentially resulting
- * in inconsistent data being present. Once proper primitives are introduced,
- * concurrency can be reintroduced.
- */
+ /*
+ * FIXME: this used to be newCacheThreadPool(), but MD-SAL does not have transaction
+ * ordering guarantees, which means that using a concurrent threadpool results
+ * in application data being committed in random order, potentially resulting
+ * in inconsistent data being present. Once proper primitives are introduced,
+ * concurrency can be reintroduced.
+ */
ExecutorService executor = Executors.newSingleThreadExecutor(factory);
COMMIT_EXECUTOR = MoreExecutors.listeningDecorator(executor);
}
return COMMIT_EXECUTOR;
}
- private static ListeningExecutorService createNamedExecutor(String format) {
- ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(format).build();
- ExecutorService executor = Executors.newCachedThreadPool(factory);
- return MoreExecutors.listeningDecorator(executor);
- }
-
public static ExecutorService getDefaultChangeEventExecutor() {
if (CHANGE_EVENT_EXECUTOR == null) {
ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("md-sal-binding-change-%d").build();
package org.opendaylight.controller.sal.binding.impl\r
\r
import com.google.common.collect.HashMultimap\r
+import com.google.common.collect.ImmutableSet\r
import com.google.common.collect.Multimap\r
-import java.util.Collection\r
+import com.google.common.collect.Multimaps\r
import java.util.Collections\r
import java.util.concurrent.Callable\r
import java.util.concurrent.ExecutorService\r
+import java.util.concurrent.Future\r
+import java.util.Set\r
import org.opendaylight.controller.sal.binding.api.NotificationListener\r
import org.opendaylight.controller.sal.binding.api.NotificationProviderService\r
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener\r
+import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder\r
import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker\r
import org.opendaylight.yangtools.concepts.AbstractObjectRegistration\r
import org.opendaylight.yangtools.concepts.ListenerRegistration\r
import org.opendaylight.yangtools.concepts.Registration\r
+import org.opendaylight.yangtools.concepts.util.ListenerRegistry\r
import org.opendaylight.yangtools.yang.binding.Notification\r
import org.slf4j.LoggerFactory\r
-import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder\rimport com.google.common.collect.Multimaps\r
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry\r
-import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener\rimport java.util.Set\r
-import java.util.Set\r
-import com.google.common.collect.ImmutableSet\r
-import java.util.concurrent.Future\r
\r
class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {\r
\r
notification.class.interfaces.filter[it != Notification && Notification.isAssignableFrom(it)]\r
}\r
\r
- @SuppressWarnings("unchecked")\r
- private def notifyAll(Collection<NotificationListener<?>> listeners, Notification notification) {\r
- listeners.forEach[(it as NotificationListener).onNotification(notification)]\r
- }\r
-\r
@Deprecated\r
override addNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {\r
throw new UnsupportedOperationException("Deprecated method. Use registerNotificationListener instead.");\r
*/
package org.opendaylight.controller.sal.binding.impl.util
-import java.util.Map.Entry
-import org.opendaylight.yangtools.concepts.Path
-import java.util.Map
-import java.util.Set
+import com.google.common.collect.Multimap
import java.util.Collection
import java.util.HashSet
-import com.google.common.collect.Multimap
+import java.util.Map.Entry
+import org.opendaylight.yangtools.concepts.Path
class MapUtils {
public interface SimpleInput extends DataObject,Augmentable<SimpleInput> {
@RoutingContext(BaseIdentity.class)
- InstanceIdentifier getIdentifier();
+ InstanceIdentifier<?> getIdentifier();
}
public static final String CONTROLLER_MODELS = "org.opendaylight.controller.model";
public static final String YANGTOOLS_MODELS = "org.opendaylight.yangtools.model";
- private static final String OPENDAYLIGHT_SNAPSHOT = "http://nexus.opendaylight.org/content/repositories/opendaylight.snapshot/";
- private static final String OPENDAYLIGHT_RELEASE = "http://nexus.opendaylight.org/content/repositories/opendaylight.release/";
public static Option mdSalCoreBundles() {
return new DefaultCompositeOption( //
*/
package org.opendaylight.controller.test.sal.binding.it;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import java.util.concurrent.Future;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
-import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ConsumerContext;
+import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer;
import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
import org.opendaylight.controller.sal.core.api.Broker;
public class DataServiceTest extends AbstractTest {
protected DataBrokerService consumerDataService;
-
-
+
+
@Inject
Broker broker2;
assertNotNull(consumerDataService);
-
+
DataModificationTransaction transaction = consumerDataService.beginTransaction();
assertNotNull(transaction);
-
+
NodeRef node1 = createNodeRef("0");
DataObject node = consumerDataService.readConfigurationData(node1.getValue());
assertNull(node);
Node nodeData1 = createNode("0");
-
+
transaction.putConfigurationData(node1.getValue(), nodeData1);
Future<RpcResult<TransactionStatus>> commitResult = transaction.commit();
assertNotNull(commitResult);
-
+
RpcResult<TransactionStatus> result = commitResult.get();
-
+
assertNotNull(result);
assertNotNull(result.getResult());
assertEquals(TransactionStatus.COMMITED, result.getResult());
-
+
Node readedData = (Node) consumerDataService.readConfigurationData(node1.getValue());
assertNotNull(readedData);
assertEquals(nodeData1.getKey(), readedData.getKey());
-
-
+
+
DataModificationTransaction transaction2 = consumerDataService.beginTransaction();
assertNotNull(transaction);
-
+
transaction2.removeConfigurationData(node1.getValue());
-
+
Future<RpcResult<TransactionStatus>> commitResult2 = transaction2.commit();
assertNotNull(commitResult2);
-
+
RpcResult<TransactionStatus> result2 = commitResult2.get();
-
+
assertNotNull(result2);
assertNotNull(result2.getResult());
assertEquals(TransactionStatus.COMMITED, result2.getResult());
-
+
DataObject readedData2 = consumerDataService.readConfigurationData(node1.getValue());
assertNull(readedData2);
-
-
+
+
}
-
+
private static NodeRef createNodeRef(String string) {
NodeKey key = new NodeKey(new NodeId(string));
- InstanceIdentifier<Node> path = InstanceIdentifier.builder().node(Nodes.class).node(Node.class, key)
- .toInstance();
+ InstanceIdentifier<Node> path = InstanceIdentifier.builder(Nodes.class).child(Node.class, key).build();
return new NodeRef(path);
}
-
+
private static Node createNode(String string) {
NodeBuilder ret = new NodeBuilder();
NodeId id = new NodeId(string);
*/
private static NodeRef createNodeRef(String string) {
NodeKey key = new NodeKey(new NodeId(string));
- InstanceIdentifier<Node> path = InstanceIdentifier.builder().node(Nodes.class).node(Node.class, key)
- .toInstance();
+ InstanceIdentifier<Node> path = InstanceIdentifier.builder(Nodes.class).child(Node.class, key).build();
return new NodeRef(path);
}
import com.google.common.collect.ImmutableSet;
public class RoutingUtils {
-
+
public static <C,P> RouteChange<C,P> removalChange(C context,P path) {
final ImmutableMap<C, Set<P>> announcements = ImmutableMap.<C,Set<P>>of();
final ImmutableMap<C, Set<P>> removals = ImmutableMap.<C,Set<P>>of(context, ImmutableSet.of(path));
return new RouteChangeImpl<C,P>(announcements, removals);
}
-
+
public static <C,P> RouteChange<C,P> announcementChange(C context,P path) {
final ImmutableMap<C, Set<P>> announcements = ImmutableMap.<C,Set<P>>of(context, ImmutableSet.of(path));
final ImmutableMap<C, Set<P>> removals = ImmutableMap.<C,Set<P>>of();
return new RouteChangeImpl<C,P>(announcements, removals);
}
-
-
+
+
public static <C,P> RouteChange<C,P> change(Map<C, Set<P>> announcements,
Map<C, Set<P>> removals) {
final ImmutableMap<C, Set<P>> immutableAnnouncements = ImmutableMap.<C,Set<P>>copyOf(announcements);
final ImmutableMap<C, Set<P>> immutableRemovals = ImmutableMap.<C,Set<P>>copyOf(removals);
return new RouteChangeImpl<C,P>(immutableAnnouncements, immutableRemovals);
}
-
-
+
+
private static class RouteChangeImpl<C,P> implements RouteChange<C, P> {
private final Map<C, Set<P>> removal;
private final Map<C, Set<P>> announcement;
public Map<C, Set<P>> getAnnouncements() {
return announcement;
}
-
+
@Override
public Map<C, Set<P>> getRemovals() {
return removal;
if (getClass() != obj.getClass()) {
return false;
}
- RouteChangeImpl other = (RouteChangeImpl) obj;
+ RouteChangeImpl<?, ?> other = (RouteChangeImpl<?, ?>) obj;
if (announcement == null) {
if (other.announcement != null)
return false;
}
-
+
}
Preconditions.checkNotNull(transaction);
transaction.changeStatus(TransactionStatus.SUBMITED);
final TwoPhaseCommit<P, D, DCL> task = new TwoPhaseCommit<P, D, DCL>(transaction, this);
- ;
+
this.getSubmittedTransactionsCount().getAndIncrement();
return this.getExecutor().submit(task);
}
log.trace("Transaction: {} Affected Subtrees:", transactionId, changedPaths);
+ // The transaction has no effects, let's just shortcut it
+ if (changedPaths.isEmpty()) {
+ dataBroker.getFinishedTransactionsCount().getAndIncrement();
+ transaction.changeStatus(TransactionStatus.COMMITED);
+
+ log.trace("Transaction: {} Finished successfully (no effects).", transactionId);
+
+ return Rpcs.<TransactionStatus> getRpcResult(true, TransactionStatus.COMMITED,
+ Collections.<RpcError> emptySet());
+ }
+
final ImmutableList.Builder<ListenerStateCapture<P, D, DCL>> listenersBuilder = ImmutableList.builder();
listenersBuilder.addAll(dataBroker.affectedListeners(changedPaths));
filterProbablyAffectedListeners(dataBroker.probablyAffectedListeners(changedPaths),listenersBuilder);
@Override
public void run() {
for (final ListenerStateCapture<P, D, DCL> listenerSet : listeners) {
- {
- DataChangeEvent<P, D> changeEvent = listenerSet.createEvent(transaction);
- for (final DataChangeListenerRegistration<P, D, DCL> listener : listenerSet.getListeners()) {
- try {
- listener.getInstance().onDataChanged(changeEvent);
- } catch (Exception e) {
- log.error("Unhandled exception when invoking listener {}", listener);
- }
+ DataChangeEvent<P, D> changeEvent = listenerSet.createEvent(transaction);
+ for (final DataChangeListenerRegistration<P, D, DCL> listener : listenerSet.getListeners()) {
+ try {
+ listener.getInstance().onDataChanged(changeEvent);
+ } catch (Exception e) {
+ log.error("Unhandled exception when invoking listener {}", listener, e);
}
}
}
package org.opendaylight.controller.sal.dom.broker;
import java.util.Collections
-import java.util.HashMap
import java.util.HashSet
-import java.util.Map
import java.util.Set
import java.util.concurrent.Callable
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.Future
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener
import org.opendaylight.controller.sal.core.api.Broker
-import org.opendaylight.controller.sal.core.api.BrokerService
import org.opendaylight.controller.sal.core.api.Consumer
import org.opendaylight.controller.sal.core.api.Provider
-import org.opendaylight.controller.sal.core.spi.BrokerModule
+import org.opendaylight.controller.sal.core.api.RpcImplementation
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
+import org.opendaylight.controller.sal.core.api.RpcRoutingContext
+import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter
import org.opendaylight.yangtools.yang.common.QName
import org.opendaylight.yangtools.yang.common.RpcResult
import org.opendaylight.yangtools.yang.data.api.CompositeNode
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
import org.osgi.framework.BundleContext
import org.slf4j.LoggerFactory
-import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter
-import org.opendaylight.yangtools.concepts.ListenerRegistration
-import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
-import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry
-import org.opendaylight.controller.sal.core.api.RpcImplementation
-import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener
-import org.opendaylight.controller.sal.core.api.RpcRoutingContext
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable {
private static val log = LoggerFactory.getLogger(BrokerImpl);
*/
package org.opendaylight.controller.sal.dom.broker.impl
-import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
-import org.opendaylight.yangtools.yang.data.api.CompositeNode
-import org.opendaylight.controller.md.sal.common.api.data.DataReader
-import org.opendaylight.yangtools.yang.common.QName
import java.net.URI
-import java.util.List
-import org.opendaylight.yangtools.yang.data.api.Node
import java.util.ArrayList
-import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
-import java.util.Map
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifier
-import org.opendaylight.yangtools.yang.data.api.SimpleNode
+import java.util.Collection
import java.util.Collections
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates
import java.util.HashMap
-import static com.google.common.base.Preconditions.*;
-import java.util.Collection
-import java.util.Set
+import java.util.Map
import java.util.Map.Entry
-import org.slf4j.LoggerFactory
+import java.util.Set
+import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
+import org.opendaylight.yangtools.yang.common.QName
+import org.opendaylight.yangtools.yang.data.api.CompositeNode
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument
+import org.opendaylight.yangtools.yang.data.api.Node
+import org.opendaylight.yangtools.yang.data.api.SimpleNode
import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
+import org.slf4j.LoggerFactory
+
+import static com.google.common.base.Preconditions.*
class DataReaderRouter extends AbstractDataReadRouter<InstanceIdentifier, CompositeNode> {
private static val LOG = LoggerFactory.getLogger(DataReaderRouter);
var name = pathArgument?.nodeType;
val nodes = new ArrayList<Node<?>>();
val keyNodes = new HashMap<QName, SimpleNode<?>>();
- val iterator = data.iterator;
for(dataBit : data) {
try {
if(pathArgument != null && dataBit != null) {
protected CompositeNode mergeData(InstanceIdentifier path, CompositeNode stored, CompositeNode modified,
boolean config) {
- long startTime = System.nanoTime();
+ // long startTime = System.nanoTime();
try {
DataSchemaNode node = schemaNodeFor(path);
return YangDataOperations.merge(node, stored, modified, config);
throw new IllegalArgumentException("Supplied node is not data node container.");
}
- private def static checkConfigurational(DataSchemaNode node, boolean config) {
- if (config) {
- checkArgument(node.configuration, "Supplied composite node is not configurational.");
- }
- }
-
private static dispatch def Iterable<? extends Node<?>> mergeMultiple(LeafSchemaNode node, List<Node<?>> original,
List<Node<?>> modified, boolean configurational) {
checkArgument(original.size === 1);
import java.util.List
import java.util.Set
import java.util.concurrent.atomic.AtomicInteger
+import org.opendaylight.controller.netconf.api.NetconfMessage
import org.opendaylight.controller.sal.common.util.Rpcs
+import org.opendaylight.yangtools.yang.common.QName
+import org.opendaylight.yangtools.yang.common.RpcResult
import org.opendaylight.yangtools.yang.data.api.CompositeNode
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument
-import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
-import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
-import java.util.Collections
-import java.util.List
-import java.util.Set
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument
import org.opendaylight.yangtools.yang.data.api.Node
import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
+import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
+import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils
import org.opendaylight.yangtools.yang.model.api.NotificationDefinition
import org.opendaylight.yangtools.yang.model.api.SchemaContext
import org.w3c.dom.Document
import org.w3c.dom.Element
-import org.opendaylight.yangtools.yang.common.QName
-import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
-import org.opendaylight.controller.netconf.api.NetconfMessage
-import org.opendaylight.yangtools.yang.common.RpcResult
class NetconfMapping {
private static void registerAsJMXListener(MBeanServerConnection mBeanServerConnection, ConfigPersisterNotificationListener listener) {
logger.trace("Called registerAsJMXListener");
try {
- mBeanServerConnection.addNotificationListener(DefaultCommitOperationMXBean.objectName, listener, null, null);
+ mBeanServerConnection.addNotificationListener(DefaultCommitOperationMXBean.OBJECT_NAME, listener, null, null);
} catch (InstanceNotFoundException | IOException e) {
throw new RuntimeException("Cannot register as JMX listener to netconf", e);
}
@Override
public synchronized void close() {
// unregister from JMX
- ObjectName on = DefaultCommitOperationMXBean.objectName;
+ ObjectName on = DefaultCommitOperationMXBean.OBJECT_NAME;
try {
if (mBeanServerConnection.isRegistered(on)) {
mBeanServerConnection.removeNotificationListener(on, listener);
private final long sessionId;
private boolean up = false;
- protected final Channel channel;
+ private final Channel channel;
protected AbstractNetconfSession(L sessionListener, Channel channel, long sessionId) {
this.sessionListener = sessionListener;
this.channel = channel;
this.sessionId = sessionId;
- logger.debug("Session {} created", toString());
+ logger.debug("Session {} created", sessionId);
}
protected abstract S thisInstance();
package org.opendaylight.controller.netconf.api;
public class NetconfSessionPreferences {
- protected final NetconfMessage helloMessage;
+ private final NetconfMessage helloMessage;
public NetconfSessionPreferences(final NetconfMessage helloMessage) {
this.helloMessage = helloMessage;
private final Element configSnapshot;
- private static final String afterCommitMessageTemplate = "Commit successful: %s";
+ private static final String AFTER_COMMIT_MESSAGE_TEMPLATE = "Commit successful: %s";
private final Set<String> capabilities;
CommitJMXNotification(NotificationBroadcasterSupport source, String message, Element cfgSnapshot,
Set<String> capabilities) {
- super(TransactionProviderJMXNotificationType.commit, source, String.format(afterCommitMessageTemplate, message));
+ super(TransactionProviderJMXNotificationType.commit, source, String.format(AFTER_COMMIT_MESSAGE_TEMPLATE, message));
this.configSnapshot = cfgSnapshot;
this.capabilities = capabilities;
}
public interface DefaultCommitOperationMXBean {
- static String typeName = "NetconfNotificationProvider";
- public static ObjectName objectName = ObjectNameUtil.createONWithDomainAndType(typeName);
+ String TYPE_NAME = "NetconfNotificationProvider";
+ ObjectName OBJECT_NAME = ObjectNameUtil.createONWithDomainAndType(TYPE_NAME);
}
private final MBeanServer mbeanServer;
- private final ObjectName on = DefaultCommitOperationMXBean.objectName;
+ private final ObjectName on = DefaultCommitOperationMXBean.OBJECT_NAME;
public DefaultCommitNotificationProducer(MBeanServer mBeanServer) {
this.mbeanServer = mBeanServer;
logger.debug("Additional header from hello parsed as {} from {}", parsedHeader, additionalHeader);
- return new NetconfServerSession(sessionListener, channel, sessionPreferences.getSessionId(), parsedHeader);
+ return new NetconfServerSession(sessionListener, channel, getSessionPreferences().getSessionId(), parsedHeader);
}
}
package org.opendaylight.controller.netconf.impl;
-import org.junit.Before;
-import org.junit.Test;
-import org.opendaylight.controller.netconf.util.messages.NetconfMessageHeader;
-
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-public class MessageHeaderTest {
-
- private NetconfMessageHeader header = null;
-
- @Before
- public void setUp() {
- this.header = new NetconfMessageHeader();
- }
+import org.junit.Test;
+import org.opendaylight.controller.netconf.util.messages.NetconfMessageHeader;
+public class MessageHeaderTest {
@Test
public void testFromBytes() {
final byte[] raw = new byte[] { (byte) 0x0a, (byte) 0x23, (byte) 0x35, (byte) 0x38, (byte) 0x0a };
- this.header.fromBytes(raw);
- assertEquals(58, this.header.getLength());
+ NetconfMessageHeader header = NetconfMessageHeader.fromBytes(raw);
+ assertEquals(58, header.getLength());
}
@Test
public void testToBytes() {
- this.header.setLength(123);
+ NetconfMessageHeader header = new NetconfMessageHeader(123);
assertArrayEquals(new byte[] { (byte) 0x0a, (byte) 0x23, (byte) 0x31, (byte) 0x32, (byte) 0x33, (byte) 0x0a },
- this.header.toBytes());
+ header.toBytes());
}
}
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.handler.ChunkedFramingMechanismEncoder;
import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory;
-import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator;
-import org.opendaylight.controller.netconf.util.handler.NetconfMessageChunkDecoder;
+import org.opendaylight.controller.netconf.util.handler.NetconfChunkAggregator;
+import org.opendaylight.controller.netconf.util.handler.NetconfEOMAggregator;
import org.opendaylight.controller.netconf.util.handler.NetconfMessageToXMLEncoder;
import org.opendaylight.controller.netconf.util.handler.NetconfXMLToMessageDecoder;
import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK),
new NetconfMessageToXMLEncoder(),
- new NetconfMessageAggregator(FramingMechanism.CHUNK), new NetconfMessageChunkDecoder(),
+ new NetconfChunkAggregator(),
new NetconfXMLToMessageDecoder());
testChunkChannel.writeOutbound(this.msg);
enc.encode(null, msg, out);
int msgLength = out.readableBytes();
- int chunkCount = msgLength / NetconfMessageConstants.MAX_CHUNK_SIZE;
- if ((msgLength % NetconfMessageConstants.MAX_CHUNK_SIZE) != 0) {
+ int chunkCount = msgLength / ChunkedFramingMechanismEncoder.DEFAULT_CHUNK_SIZE;
+ if ((msgLength % ChunkedFramingMechanismEncoder.DEFAULT_CHUNK_SIZE) != 0) {
chunkCount++;
}
for (int i = 1; i <= chunkCount; i++) {
ByteBuf recievedOutbound = (ByteBuf) messages.poll();
- int exptHeaderLength = NetconfMessageConstants.MAX_CHUNK_SIZE;
+ int exptHeaderLength = ChunkedFramingMechanismEncoder.DEFAULT_CHUNK_SIZE;
if (i == chunkCount) {
- exptHeaderLength = msgLength - (NetconfMessageConstants.MAX_CHUNK_SIZE * (i - 1));
- byte[] eom = new byte[NetconfMessageConstants.endOfChunk.length];
- recievedOutbound.getBytes(recievedOutbound.readableBytes() - NetconfMessageConstants.endOfChunk.length,
+ exptHeaderLength = msgLength - (ChunkedFramingMechanismEncoder.DEFAULT_CHUNK_SIZE * (i - 1));
+ byte[] eom = new byte[NetconfMessageConstants.END_OF_CHUNK.length];
+ recievedOutbound.getBytes(recievedOutbound.readableBytes() - NetconfMessageConstants.END_OF_CHUNK.length,
eom);
- assertArrayEquals(NetconfMessageConstants.endOfChunk, eom);
+ assertArrayEquals(NetconfMessageConstants.END_OF_CHUNK, eom);
}
byte[] header = new byte[String.valueOf(exptHeaderLength).length()
+ NetconfMessageConstants.MIN_HEADER_LENGTH - 1];
recievedOutbound.getBytes(0, header);
- NetconfMessageHeader messageHeader = new NetconfMessageHeader();
- messageHeader.fromBytes(header);
+ NetconfMessageHeader messageHeader = NetconfMessageHeader.fromBytes(header);
assertEquals(exptHeaderLength, messageHeader.getLength());
testChunkChannel.writeInbound(recievedOutbound);
public void testEOMFramingMechanismOnPipeline() throws Exception {
EmbeddedChannel testChunkChannel = new EmbeddedChannel(
FramingMechanismHandlerFactory.createHandler(FramingMechanism.EOM),
- new NetconfMessageToXMLEncoder(), new NetconfMessageAggregator(
- FramingMechanism.EOM), new NetconfXMLToMessageDecoder());
+ new NetconfMessageToXMLEncoder(), new NetconfEOMAggregator(), new NetconfXMLToMessageDecoder());
testChunkChannel.writeOutbound(this.msg);
ByteBuf recievedOutbound = (ByteBuf) testChunkChannel.readOutbound();
- byte[] eom = new byte[NetconfMessageConstants.endOfMessage.length];
- recievedOutbound.getBytes(recievedOutbound.readableBytes() - NetconfMessageConstants.endOfMessage.length, eom);
- assertArrayEquals(NetconfMessageConstants.endOfMessage, eom);
+ byte[] eom = new byte[NetconfMessageConstants.END_OF_MESSAGE.length];
+ recievedOutbound.getBytes(recievedOutbound.readableBytes() - NetconfMessageConstants.END_OF_MESSAGE.length, eom);
+ assertArrayEquals(NetconfMessageConstants.END_OF_MESSAGE, eom);
testChunkChannel.writeInbound(recievedOutbound);
NetconfMessage receivedMessage = (NetconfMessage) testChunkChannel.readInbound();
*/
package org.opendaylight.controller.netconf.it;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import static junit.framework.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import io.netty.channel.ChannelFuture;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.Notification;
+import javax.management.NotificationListener;
+
import org.apache.commons.lang3.StringUtils;
import org.junit.After;
import org.junit.Assert;
import org.opendaylight.controller.netconf.persist.impl.ConfigPersisterNotificationHandler;
import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.w3c.dom.Element;
-import javax.management.InstanceNotFoundException;
-import javax.management.Notification;
-import javax.management.NotificationListener;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import static junit.framework.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
public class NetconfConfigPersisterITTest extends AbstractNetconfConfigTest {
- private static final Logger logger = LoggerFactory.getLogger(NetconfConfigPersisterITTest.class);
-
private static final InetSocketAddress tcpAddress = new InetSocketAddress("127.0.0.1", 12023);
private VerifyingNotificationListener createCommitNotificationListener() throws InstanceNotFoundException {
VerifyingNotificationListener listener = new VerifyingNotificationListener();
- platformMBeanServer.addNotificationListener(DefaultCommitNotificationProducer.objectName, listener, null, null);
+ platformMBeanServer.addNotificationListener(DefaultCommitNotificationProducer.OBJECT_NAME, listener, null, null);
return listener;
}
public VerifyingPersister() throws IOException {
Persister mockedAggregator = mock(Persister.class);
- doAnswer(new Answer() {
+ doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
ConfigSnapshotHolder configSnapshot = (ConfigSnapshotHolder) invocation.getArguments()[0];
*/
package org.opendaylight.controller.netconf.it;
-import com.google.common.base.Charsets;
-import com.google.common.base.Optional;
-import com.google.common.collect.Sets;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import io.netty.channel.ChannelFuture;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
import junit.framework.Assert;
+
import org.junit.Before;
import org.junit.Test;
import org.junit.matchers.JUnitMatchers;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
+import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
public class NetconfMonitoringITTest extends AbstractNetconfConfigTest {
}
}
+ sock.close();
+
org.junit.Assert.assertThat(responseBuilder.toString(), JUnitMatchers.containsString("<capability>urn:ietf:params:netconf:capability:candidate:1.0</capability>"));
org.junit.Assert.assertThat(responseBuilder.toString(), JUnitMatchers.containsString("<username>tomas</username>"));
}
import org.opendaylight.controller.netconf.api.NetconfSession;
import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory;
+import org.opendaylight.controller.netconf.util.handler.NetconfEOMAggregator;
import org.opendaylight.controller.netconf.util.handler.NetconfHelloMessageToXMLEncoder;
-import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator;
import org.opendaylight.controller.netconf.util.handler.NetconfXMLToHelloMessageDecoder;
import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
public static final String NETCONF_SESSION_NEGOTIATOR = "negotiator";
public void initialize(SocketChannel ch, Promise<S> promise) {
- ch.pipeline().addLast(NETCONF_MESSAGE_AGGREGATOR, new NetconfMessageAggregator(FramingMechanism.EOM));
+ ch.pipeline().addLast(NETCONF_MESSAGE_AGGREGATOR, new NetconfEOMAggregator());
initializeMessageDecoder(ch);
ch.pipeline().addLast(NETCONF_MESSAGE_FRAME_ENCODER, FramingMechanismHandlerFactory.createHandler(FramingMechanism.EOM));
initializeMessageEncoder(ch);
package org.opendaylight.controller.netconf.util;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.Promise;
+
import java.util.concurrent.TimeUnit;
+import io.netty.channel.ChannelInboundHandlerAdapter;
import org.opendaylight.controller.netconf.api.AbstractNetconfSession;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.api.NetconfSessionListener;
import org.opendaylight.controller.netconf.api.NetconfSessionPreferences;
import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory;
-import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
-import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator;
-import org.opendaylight.controller.netconf.util.handler.NetconfMessageChunkDecoder;
+import org.opendaylight.controller.netconf.util.handler.NetconfChunkAggregator;
import org.opendaylight.controller.netconf.util.handler.NetconfMessageToXMLEncoder;
import org.opendaylight.controller.netconf.util.handler.NetconfXMLToMessageDecoder;
import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.opendaylight.protocol.framework.AbstractSessionNegotiator;
import org.slf4j.Logger;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.Timeout;
-import io.netty.util.Timer;
-import io.netty.util.TimerTask;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.Promise;
-
public abstract class AbstractNetconfSessionNegotiator<P extends NetconfSessionPreferences, S extends AbstractNetconfSession<S, L>, L extends NetconfSessionListener<S>>
extends AbstractSessionNegotiator<NetconfHelloMessage, S> {
private static final Logger logger = LoggerFactory.getLogger(AbstractNetconfSessionNegotiator.class);
public static final String NAME_OF_EXCEPTION_HANDLER = "lastExceptionHandler";
- public static final String CHUNK_DECODER_CHANNEL_HANDLER_KEY = "chunkDecoder";
- protected final P sessionPreferences;
+ private final P sessionPreferences;
private final L sessionListener;
private Timeout timeout;
Future<Channel> future = sslHandler.get().handshakeFuture();
future.addListener(new GenericFutureListener<Future<? super Channel>>() {
@Override
- public void operationComplete(Future<? super Channel> future) throws Exception {
+ public void operationComplete(Future<? super Channel> future) {
Preconditions.checkState(future.isSuccess(), "Ssl handshake was not successful");
logger.debug("Ssl handshake complete");
start();
}
});
- } else
+ } else {
start();
+ }
}
private static Optional<SslHandler> getSslHandler(Channel channel) {
return sslHandler == null ? Optional.<SslHandler> absent() : Optional.of(sslHandler);
}
+ public P getSessionPreferences() {
+ return sessionPreferences;
+ }
+
private void start() {
final NetconfMessage helloMessage = this.sessionPreferences.getHelloMessage();
logger.debug("Session negotiation started with hello message {}", XmlUtil.toString(helloMessage.getDocument()));
- channel.pipeline().addLast(NAME_OF_EXCEPTION_HANDLER, new ChannelHandler() {
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
- }
-
- @Override
- public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- logger.warn("An exception occurred during negotiation on channel {}", channel.localAddress(), cause);
- cancelTimeout();
- negotiationFailed(cause);
- changeState(State.FAILED);
- }
- });
+ channel.pipeline().addLast(NAME_OF_EXCEPTION_HANDLER, new ExceptionHandlingInboundChannelHandler());
timeout = this.timer.newTimeout(new TimerTask() {
@Override
- public void run(final Timeout timeout) throws Exception {
+ public void run(final Timeout timeout) {
synchronized (this) {
if (state != State.ESTABLISHED) {
logger.debug("Connection timeout after {}, session is in state {}", timeout, state);
}
private void cancelTimeout() {
- if(timeout!=null)
+ if(timeout!=null) {
timeout.cancel();
+ }
}
private void sendMessage(NetconfMessage message) {
@Override
protected void handleMessage(NetconfHelloMessage netconfMessage) {
- final Document doc = netconfMessage.getDocument();
+ Preconditions.checkNotNull(netconfMessage != null, "netconfMessage");
- // Only Hello message should arrive during negotiation
- if (netconfMessage instanceof NetconfHelloMessage) {
+ final Document doc = netconfMessage.getDocument();
- replaceHelloMessageHandlers();
+ replaceHelloMessageHandlers();
- if (shouldUseChunkFraming(doc)) {
- insertChunkFramingToPipeline();
- }
+ if (shouldUseChunkFraming(doc)) {
+ insertChunkFramingToPipeline();
+ }
- changeState(State.ESTABLISHED);
- S session = getSession(sessionListener, channel, (NetconfHelloMessage)netconfMessage);
+ changeState(State.ESTABLISHED);
+ S session = getSession(sessionListener, channel, netconfMessage);
- negotiationSuccessful(session);
- } else {
- final IllegalStateException cause = new IllegalStateException(
- "Received message was not hello as expected, but was " + XmlUtil.toString(doc));
- logger.warn("Negotiation of netconf session failed", cause);
- negotiationFailed(cause);
- }
+ negotiationSuccessful(session);
}
/**
replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_FRAME_ENCODER,
FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK));
replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_AGGREGATOR,
- new NetconfMessageAggregator(FramingMechanism.CHUNK));
- channel.pipeline().addAfter(AbstractChannelInitializer.NETCONF_MESSAGE_AGGREGATOR,
- CHUNK_DECODER_CHANNEL_HANDLER_KEY, new NetconfMessageChunkDecoder());
+ new NetconfChunkAggregator());
}
private boolean shouldUseChunkFraming(Document doc) {
}
private static boolean isStateChangePermitted(State state, State newState) {
- if (state == State.IDLE && newState == State.OPEN_WAIT)
+ if (state == State.IDLE && newState == State.OPEN_WAIT) {
return true;
- if (state == State.OPEN_WAIT && newState == State.ESTABLISHED)
+ }
+ if (state == State.OPEN_WAIT && newState == State.ESTABLISHED) {
return true;
- if (state == State.OPEN_WAIT && newState == State.FAILED)
+ }
+ if (state == State.OPEN_WAIT && newState == State.FAILED) {
return true;
+ }
logger.debug("Transition from {} to {} is not allowed", state, newState);
return false;
}
+
+ /**
+ * Handler to catch exceptions in pipeline during negotiation
+ */
+ private final class ExceptionHandlingInboundChannelHandler extends ChannelInboundHandlerAdapter {
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ logger.warn("An exception occurred during negotiation on channel {}", channel.localAddress(), cause);
+ cancelTimeout();
+ negotiationFailed(cause);
+ changeState(State.FAILED);
+ }
+ }
}
import java.io.IOException;
import java.io.InputStream;
-public class NetconfUtil {
+public final class NetconfUtil {
private static final Logger logger = LoggerFactory.getLogger(NetconfUtil.class);
+ private NetconfUtil() {}
+
public static NetconfMessage createMessage(final File f) {
Preconditions.checkNotNull(f, "File parameter was null");
try {
package org.opendaylight.controller.netconf.util.handler;
-import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
-import org.opendaylight.controller.netconf.util.messages.NetconfMessageHeader;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
+import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
+import org.opendaylight.controller.netconf.util.messages.NetconfMessageHeader;
+
+import com.google.common.base.Preconditions;
+
public class ChunkedFramingMechanismEncoder extends MessageToByteEncoder<ByteBuf> {
+ public static final int DEFAULT_CHUNK_SIZE = 8192;
+ public static final int MIN_CHUNK_SIZE = 128;
+ public static final int MAX_CHUNK_SIZE = 16 * 1024 * 1024;
- private NetconfMessageHeader messageHeader = new NetconfMessageHeader();
+ private final int chunkSize;
- private final static int MAX_CHUNK_SIZE = NetconfMessageConstants.MAX_CHUNK_SIZE;
+ public ChunkedFramingMechanismEncoder() {
+ this(DEFAULT_CHUNK_SIZE);
+ }
+
+ public ChunkedFramingMechanismEncoder(int chunkSize) {
+ Preconditions.checkArgument(chunkSize > MIN_CHUNK_SIZE);
+ Preconditions.checkArgument(chunkSize < MAX_CHUNK_SIZE);
+ this.chunkSize = chunkSize;
+ }
+
+ public final int getChunkSize() {
+ return chunkSize;
+ }
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
- while (msg.readableBytes() > MAX_CHUNK_SIZE) {
- ByteBuf chunk = Unpooled.buffer(MAX_CHUNK_SIZE);
- chunk.writeBytes(createChunkHeader(MAX_CHUNK_SIZE));
- chunk.writeBytes(msg.readBytes(MAX_CHUNK_SIZE));
+ while (msg.readableBytes() > chunkSize) {
+ ByteBuf chunk = Unpooled.buffer(chunkSize);
+ chunk.writeBytes(createChunkHeader(chunkSize));
+ chunk.writeBytes(msg.readBytes(chunkSize));
ctx.write(chunk);
}
out.writeBytes(createChunkHeader(msg.readableBytes()));
out.writeBytes(msg.readBytes(msg.readableBytes()));
- out.writeBytes(NetconfMessageConstants.endOfChunk);
+ out.writeBytes(NetconfMessageConstants.END_OF_CHUNK);
}
private ByteBuf createChunkHeader(int chunkSize) {
- messageHeader.setLength(chunkSize);
- return Unpooled.wrappedBuffer(messageHeader.toBytes());
+ return Unpooled.wrappedBuffer(NetconfMessageHeader.toBytes(chunkSize));
}
-
}
package org.opendaylight.controller.netconf.util.handler;
-import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
-
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
-public class EOMFramingMechanismEncoder extends MessageToByteEncoder<ByteBuf> {
-
- private byte[] eom = NetconfMessageConstants.endOfMessage;
+import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
+public class EOMFramingMechanismEncoder extends MessageToByteEncoder<ByteBuf> {
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
out.writeBytes(msg);
- out.writeBytes(eom);
+ out.writeBytes(NetconfMessageConstants.END_OF_MESSAGE);
}
-
}
public class FramingMechanismHandlerFactory {
- private final static Logger logger = LoggerFactory.getLogger(FramingMechanismHandlerFactory.class);
+ private static final Logger logger = LoggerFactory.getLogger(FramingMechanismHandlerFactory.class);
+
+ private FramingMechanismHandlerFactory() {}
public static MessageToByteEncoder<ByteBuf> createHandler(FramingMechanism framingMechanism) {
logger.debug("{} framing mechanism was selected.", framingMechanism);
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.util.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NetconfChunkAggregator extends ByteToMessageDecoder {
+ private final static Logger logger = LoggerFactory.getLogger(NetconfChunkAggregator.class);
+ public static final int DEFAULT_MAXIMUM_CHUNK_SIZE = 16 * 1024 * 1024;
+
+ private static enum State {
+ HEADER_ONE, // \n
+ HEADER_TWO, // #
+ HEADER_LENGTH_FIRST, // [1-9]
+ HEADER_LENGTH_OTHER, // [0-9]*\n
+ DATA,
+ FOOTER_ONE, // \n
+ FOOTER_TWO, // #
+ FOOTER_THREE, // #
+ FOOTER_FOUR, // \n
+ }
+
+ private final int maxChunkSize = DEFAULT_MAXIMUM_CHUNK_SIZE;
+ private State state = State.HEADER_ONE;
+ private long chunkSize;
+ private ByteBuf chunk;
+
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+ while (in.isReadable()) {
+ switch (state) {
+ case HEADER_ONE:
+ {
+ final byte b = in.readByte();
+ if (b != '\n') {
+ logger.debug("Got byte {} while waiting for {}", b, (byte)'\n');
+ throw new IllegalStateException("Malformed chunk header encountered (byte 0)");
+ }
+
+ state = State.HEADER_TWO;
+ break;
+ }
+ case HEADER_TWO:
+ {
+ final byte b = in.readByte();
+ if (b != '#') {
+ logger.debug("Got byte {} while waiting for {}", b, (byte)'#');
+ throw new IllegalStateException("Malformed chunk header encountered (byte 1)");
+ }
+
+ state = State.HEADER_LENGTH_FIRST;
+ break;
+ }
+ case HEADER_LENGTH_FIRST:
+ {
+ final byte b = in.readByte();
+ if (b < '1' || b > '9') {
+ logger.debug("Got byte {} while waiting for {}-{}", b, (byte)'1', (byte)'9');
+ throw new IllegalStateException("Invalid chunk size encountered (byte 0)");
+ }
+
+ chunkSize = b - '0';
+ state = State.HEADER_LENGTH_OTHER;
+ break;
+ }
+ case HEADER_LENGTH_OTHER:
+ {
+ final byte b = in.readByte();
+ if (b == '\n') {
+ state = State.DATA;
+ break;
+ }
+
+ if (b < '0' || b > '9') {
+ logger.debug("Got byte {} while waiting for {}-{}", b, (byte)'0', (byte)'9');
+ throw new IllegalStateException("Invalid chunk size encountered");
+ }
+
+ chunkSize *= 10;
+ chunkSize += b - '0';
+
+ if (chunkSize > maxChunkSize) {
+ logger.debug("Parsed chunk size {}, maximum allowed is {}", chunkSize, maxChunkSize);
+ throw new IllegalStateException("Maximum chunk size exceeded");
+ }
+ break;
+ }
+ case DATA:
+ /*
+ * FIXME: this gathers all data into one big chunk before passing
+ * it on. Make sure the pipeline can work with partial data
+ * and then change this piece to pass the data on as it
+ * comes through.
+ */
+ if (in.readableBytes() < chunkSize) {
+ logger.debug("Buffer has {} bytes, need {} to complete chunk", in.readableBytes(), chunkSize);
+ in.discardReadBytes();
+ return;
+ }
+
+ chunk = in.readBytes((int)chunkSize);
+ state = State.FOOTER_ONE;
+ break;
+ case FOOTER_ONE:
+ {
+ final byte b = in.readByte();
+ if (b != '\n') {
+ logger.debug("Got byte {} while waiting for {}", b, (byte)'\n');
+ throw new IllegalStateException("Malformed chunk footer encountered (byte 0)");
+ }
+
+ state = State.FOOTER_TWO;
+ break;
+ }
+ case FOOTER_TWO:
+ {
+ final byte b = in.readByte();
+ if (b != '#') {
+ logger.debug("Got byte {} while waiting for {}", b, (byte)'#');
+ throw new IllegalStateException("Malformed chunk footer encountered (byte 1)");
+ }
+
+ state = State.FOOTER_THREE;
+ break;
+ }
+ case FOOTER_THREE:
+ {
+ final byte b = in.readByte();
+ if (b != '#') {
+ logger.debug("Got byte {} while waiting for {}", b, (byte)'#');
+ throw new IllegalStateException("Malformed chunk footer encountered (byte 2)");
+ }
+
+ state = State.FOOTER_FOUR;
+ break;
+ }
+ case FOOTER_FOUR:
+ {
+ final byte b = in.readByte();
+ if (b != '\n') {
+ logger.debug("Got byte {} while waiting for {}", b, (byte)'\n');
+ throw new IllegalStateException("Malformed chunk footer encountered (byte 3)");
+ }
+
+ state = State.HEADER_ONE;
+ out.add(chunk);
+ chunkSize = 0;
+ chunk = null;
+ break;
+ }
+ }
+ }
+
+ in.discardReadBytes();
+ }
+}
package org.opendaylight.controller.netconf.util.handler;
-import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
-import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
-import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.List;
-public class NetconfMessageAggregator extends ByteToMessageDecoder {
-
- private final static Logger logger = LoggerFactory.getLogger(NetconfMessageAggregator.class);
+import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
- private byte[] eom = NetconfMessageConstants.endOfMessage;
+import com.google.common.base.Charsets;
- public NetconfMessageAggregator(FramingMechanism framingMechanism) {
- if (framingMechanism == FramingMechanism.CHUNK) {
- eom = NetconfMessageConstants.endOfChunk;
- }
- }
+public class NetconfEOMAggregator extends ByteToMessageDecoder {
+ private final static Logger logger = LoggerFactory.getLogger(NetconfEOMAggregator.class);
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
- int index = indexOfSequence(in, eom);
+ int index = indexOfSequence(in, NetconfMessageConstants.END_OF_MESSAGE);
if (index == -1) {
logger.debug("Message is not complete, read again.");
if (logger.isTraceEnabled()) {
ctx.read();
} else {
ByteBuf msg = in.readBytes(index);
- in.readBytes(eom.length);
+ in.readBytes(NetconfMessageConstants.END_OF_MESSAGE.length);
in.discardReadBytes();
logger.debug("Message is complete.");
out.add(msg);
*/
package org.opendaylight.controller.netconf.util.handler;
-import java.nio.ByteBuffer;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+
+import java.io.IOException;
+
+import javax.xml.transform.TransformerException;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
* </pre>
*/
public final class NetconfHelloMessageToXMLEncoder extends NetconfMessageToXMLEncoder {
-
@Override
- protected ByteBuffer encodeMessage(NetconfMessage msg) {
+ @VisibleForTesting
+ public void encode(ChannelHandlerContext ctx, NetconfMessage msg, ByteBuf out) throws IOException, TransformerException {
Preconditions.checkState(msg instanceof NetconfHelloMessage, "Netconf message of type %s expected, was %s",
NetconfHelloMessage.class, msg.getClass());
Optional<NetconfHelloMessageAdditionalHeader> headerOptional = ((NetconfHelloMessage) msg)
// If additional header present, serialize it along with netconf hello
// message
if (headerOptional.isPresent()) {
- byte[] bytesFromHeader = headerOptional.get().toFormattedString().getBytes(Charsets.UTF_8);
- byte[] bytesFromMessage = xmlToString(msg.getDocument()).getBytes(Charsets.UTF_8);
-
- ByteBuffer byteBuffer = ByteBuffer.allocate(bytesFromHeader.length + bytesFromMessage.length)
- .put(bytesFromHeader).put(bytesFromMessage);
- byteBuffer.flip();
- return byteBuffer;
+ out.writeBytes(headerOptional.get().toFormattedString().getBytes(Charsets.UTF_8));
}
- return super.encodeMessage(msg);
+ super.encode(ctx, msg, out);
}
}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.netconf.util.handler;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
-
-import java.nio.charset.Charset;
-import java.util.List;
-
-import org.opendaylight.controller.netconf.api.NetconfDeserializerException;
-import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class NetconfMessageChunkDecoder extends ByteToMessageDecoder {
-
- private final static Logger logger = LoggerFactory.getLogger(NetconfMessageChunkDecoder.class);
-
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
- ByteBuf byteBufMsg = Unpooled.buffer(in.readableBytes());
- int chunkSize = -1;
- boolean isParsed = false;
- while (in.isReadable()) {
- try {
- if (!isParsed) {
- chunkSize = readHeader(in);
- isParsed = true;
- }
- if (chunkSize != -1 && isParsed) {
- in.readBytes(byteBufMsg, chunkSize);
- isParsed = false;
- } else {
- throw new NetconfDeserializerException("Unable to parse chunked data or header.");
- }
- } catch (Exception e) {
- logger.error("Failed to decode chunked message.", e);
- this.exceptionCaught(ctx, e);
- }
- }
- out.add(byteBufMsg);
- isParsed = false;
- }
-
- private int readHeader(ByteBuf in) {
- ByteBuf chunkSize = Unpooled.buffer(NetconfMessageConstants.MIN_HEADER_LENGTH,
- NetconfMessageConstants.MAX_HEADER_LENGTH);
- byte b = in.readByte();
- if (b != 10)
- return -1;
- b = in.readByte();
- if (b != 35)
- return -1;
- while ((b = in.readByte()) != 10) {
- chunkSize.writeByte(b);
- }
- return Integer.parseInt(chunkSize.toString(Charset.forName("UTF-8")));
- }
-
-}
package org.opendaylight.controller.netconf.util.handler;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
-import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Comment;
-import org.w3c.dom.Document;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
import com.google.common.base.Optional;
public class NetconfMessageToXMLEncoder extends MessageToByteEncoder<NetconfMessage> {
private static final Logger LOG = LoggerFactory.getLogger(NetconfMessageToXMLEncoder.class);
+ private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
private final Optional<String> clientId;
@Override
@VisibleForTesting
- public void encode(ChannelHandlerContext ctx, NetconfMessage msg, ByteBuf out) throws Exception {
+ public void encode(ChannelHandlerContext ctx, NetconfMessage msg, ByteBuf out) throws IOException, TransformerException {
LOG.debug("Sent to encode : {}", msg);
if (clientId.isPresent()) {
msg.getDocument().appendChild(comment);
}
- final ByteBuffer msgBytes = encodeMessage(msg);
-
- LOG.trace("Putting message \n{}", xmlToString(msg.getDocument()));
- out.writeBytes(msgBytes);
- }
-
- protected ByteBuffer encodeMessage(NetconfMessage msg) {
- return Charsets.UTF_8.encode(xmlToString(msg.getDocument()));
- }
+ try (OutputStream os = new ByteBufOutputStream(out)) {
+ Transformer transformer = FACTORY.newTransformer();
+ transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+ transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
- protected String xmlToString(Document doc) {
- return XmlUtil.toString(doc, false);
+ StreamResult result = new StreamResult(new OutputStreamWriter(os));
+ DOMSource source = new DOMSource(msg.getDocument());
+ transformer.transform(source, result);
+ }
}
}
*/
package org.opendaylight.controller.netconf.util.handler;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+import java.io.ByteArrayInputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
* {@link org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage}
* . Used by netconf server to retrieve information about session metadata.
*/
-public class NetconfXMLToHelloMessageDecoder extends NetconfXMLToMessageDecoder {
+public final class NetconfXMLToHelloMessageDecoder extends ByteToMessageDecoder {
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfXMLToHelloMessageDecoder.class);
private static final List<byte[]> POSSIBLE_ENDS = ImmutableList.of(
new byte[] { ']', '\n' },
new byte[] { '\r', '\n', '[' },
new byte[] { '\n', '[' });
- private String additionalHeaderCache;
-
@Override
- protected byte[] preprocessMessageBytes(byte[] bytes) {
- // Extract bytes containing header with additional metadata
-
- if (startsWithAdditionalHeader(bytes)) {
- // Auth information containing username, ip address... extracted for monitoring
- int endOfAuthHeader = getAdditionalHeaderEndIndex(bytes);
- if (endOfAuthHeader > -1) {
- byte[] additionalHeaderBytes = Arrays.copyOfRange(bytes, 0, endOfAuthHeader + 2);
- additionalHeaderCache = additionalHeaderToString(additionalHeaderBytes);
- bytes = Arrays.copyOfRange(bytes, endOfAuthHeader + 2, bytes.length);
- }
+ @VisibleForTesting
+ public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+ if (in.readableBytes() == 0) {
+ LOG.debug("No more content in incoming buffer.");
+ return;
}
- return bytes;
- }
+ in.markReaderIndex();
+ try {
+ LOG.trace("Received to decode: {}", ByteBufUtil.hexDump(in));
+ byte[] bytes = new byte[in.readableBytes()];
+ in.readBytes(bytes);
+
+ logMessage(bytes);
+
+ // Extract bytes containing header with additional metadata
+ String additionalHeader = null;
+ if (startsWithAdditionalHeader(bytes)) {
+ // Auth information containing username, ip address... extracted for monitoring
+ int endOfAuthHeader = getAdditionalHeaderEndIndex(bytes);
+ if (endOfAuthHeader > -1) {
+ byte[] additionalHeaderBytes = Arrays.copyOfRange(bytes, 0, endOfAuthHeader + 2);
+ additionalHeader = additionalHeaderToString(additionalHeaderBytes);
+ bytes = Arrays.copyOfRange(bytes, endOfAuthHeader + 2, bytes.length);
+ }
+ }
- @Override
- protected void cleanUpAfterDecode() {
- additionalHeaderCache = null;
- }
+ Document doc = XmlUtil.readXmlToDocument(new ByteArrayInputStream(bytes));
- @Override
- protected NetconfMessage buildNetconfMessage(Document doc) {
- return new NetconfHelloMessage(doc, additionalHeaderCache == null ? null
- : NetconfHelloMessageAdditionalHeader.fromString(additionalHeaderCache));
+ final NetconfMessage message;
+ if (additionalHeader != null) {
+ message = new NetconfHelloMessage(doc, NetconfHelloMessageAdditionalHeader.fromString(additionalHeader));
+ } else {
+ message = new NetconfHelloMessage(doc);
+ }
+ out.add(message);
+ } finally {
+ in.discardReadBytes();
+ }
}
private int getAdditionalHeaderEndIndex(byte[] bytes) {
return -1;
}
+
+ private void logMessage(byte[] bytes) {
+ String s = Charsets.UTF_8.decode(ByteBuffer.wrap(bytes)).toString();
+ LOG.debug("Parsing message \n{}", s);
+ }
+
private boolean startsWithAdditionalHeader(byte[] bytes) {
for (byte[] possibleStart : POSSIBLE_STARTS) {
int i = 0;
for (byte b : possibleStart) {
- if(bytes[i++] != b)
+ if(bytes[i++] != b) {
break;
+ }
- if(i == possibleStart.length)
+ if(i == possibleStart.length) {
return true;
+ }
}
}
*/
package org.opendaylight.controller.netconf.util.handler;
-import java.io.ByteArrayInputStream;
-import java.nio.ByteBuffer;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
import java.util.List;
-import org.opendaylight.controller.netconf.api.NetconfDeserializerException;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.w3c.dom.Document;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufUtil;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
-
-public class NetconfXMLToMessageDecoder extends ByteToMessageDecoder {
+public final class NetconfXMLToMessageDecoder extends ByteToMessageDecoder {
private static final Logger LOG = LoggerFactory.getLogger(NetconfXMLToMessageDecoder.class);
@Override
@VisibleForTesting
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
- if (in.readableBytes() == 0) {
- LOG.debug("No more content in incoming buffer.");
- return;
- }
-
- in.markReaderIndex();
- try {
+ if (in.readableBytes() != 0) {
LOG.trace("Received to decode: {}", ByteBufUtil.hexDump(in));
- byte[] bytes = new byte[in.readableBytes()];
- in.readBytes(bytes);
-
- logMessage(bytes);
-
- bytes = preprocessMessageBytes(bytes);
- NetconfMessage message;
- try {
- Document doc = XmlUtil.readXmlToDocument(new ByteArrayInputStream(bytes));
- message = buildNetconfMessage(doc);
- } catch (Exception e) {
- throw new NetconfDeserializerException("Could not parse message from " + new String(bytes), e);
- }
-
- out.add(message);
- } finally {
- in.discardReadBytes();
- cleanUpAfterDecode();
+ out.add(new NetconfMessage(XmlUtil.readXmlToDocument(new ByteBufInputStream(in))));
+ } else {
+ LOG.debug("No more content in incoming buffer.");
}
}
-
- protected void cleanUpAfterDecode() {}
-
- protected NetconfMessage buildNetconfMessage(Document doc) {
- return new NetconfMessage(doc);
- }
-
- protected byte[] preprocessMessageBytes(byte[] bytes) {
- return bytes;
- }
-
- private void logMessage(byte[] bytes) {
- String s = Charsets.UTF_8.decode(ByteBuffer.wrap(bytes)).toString();
- LOG.debug("Parsing message \n{}", s);
- }
-
}
* stops at instance of this class. All downstream events are handed of to wrapped {@link org.opendaylight.controller.netconf.util.handler.ssh.client.SshClientAdapter};
*/
public class SshHandler extends ChannelOutboundHandlerAdapter {
+ private static final String SOCKET = "socket";
+
private final VirtualSocket virtualSocket = new VirtualSocket();
private final SshClientAdapter sshClientAdapter;
@Override
public void handlerAdded(ChannelHandlerContext ctx){
- if (ctx.channel().pipeline().get("socket") == null) {
- ctx.channel().pipeline().addFirst("socket", virtualSocket);
+ if (ctx.channel().pipeline().get(SOCKET) == null) {
+ ctx.channel().pipeline().addFirst(SOCKET, virtualSocket);
}
}
@Override
- public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
- if (ctx.channel().pipeline().get("socket") != null) {
- ctx.channel().pipeline().remove("socket");
+ public void handlerRemoved(ChannelHandlerContext ctx) {
+ if (ctx.channel().pipeline().get(SOCKET) != null) {
+ ctx.channel().pipeline().remove(SOCKET);
}
}
@Override
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws IOException {
this.sshClientAdapter.write((ByteBuf) msg);
}
public void connect(final ChannelHandlerContext ctx,
SocketAddress remoteAddress,
SocketAddress localAddress,
- ChannelPromise promise) throws Exception {
+ ChannelPromise promise) {
ctx.connect(remoteAddress, localAddress, promise);
promise.addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture channelFuture) throws Exception {
+ public void operationComplete(ChannelFuture channelFuture) {
sshClientAdapter.start(ctx);
}}
);
}
@Override
- public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+ public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
sshClientAdapter.stop(promise);
}
}
public void authenticate(Connection connection) throws IOException {
boolean isAuthenticated = connection.authenticateWithPassword(username, password);
- if (isAuthenticated == false)
+ if (isAuthenticated == false) {
throw new IOException("Authentication failed.");
+ }
}
}
private Invoker(){}
protected boolean isInvoked() {
+ // TODO invoked is always false
return invoked;
}
return new Invoker() {
@Override
void invoke(SshSession session) throws IOException {
- if (isInvoked() == true) throw new IllegalStateException("Already invoked.");
+ if (isInvoked()) {
+ throw new IllegalStateException("Already invoked.");
+ }
session.startSubSystem(subsystem);
}
*/
public class SshClient {
private final VirtualSocket socket;
- private final Map<Integer, SshSession> openSessions = new HashMap();
+ private final Map<Integer, SshSession> openSessions = new HashMap<>();
private final AuthenticationHandler authenticationHandler;
private Connection connection;
}
public SshSession openSession() throws IOException {
- if (connection == null)
+ if (connection == null) {
connect();
+ }
Session session = connection.openSession();
SshSession sshSession = new SshSession(session);
public void closeSession(SshSession session) {
if (session.getState() == Channel.STATE_OPEN || session.getState() == Channel.STATE_OPENING) {
- session.session.close();
+ session.close();
}
}
openSessions.clear();
- if (connection != null)
+ if (connection != null) {
connection.close();
+ }
}
}
* pipeline.
*/
public class SshClientAdapter implements Runnable {
+ private static final int BUFFER_SIZE = 1024;
+
private final SshClient sshClient;
private final Invoker invoker;
- private SshSession session;
- private InputStream stdOut;
- private InputStream stdErr;
private OutputStream stdIn;
- private Queue<ByteBuf> postponned = new LinkedList<>();
-
+ private Queue<ByteBuf> postponed = new LinkedList<>();
private ChannelHandlerContext ctx;
private ChannelPromise disconnectPromise;
public void run() {
try {
- session = sshClient.openSession();
+ SshSession session = sshClient.openSession();
invoker.invoke(session);
- stdOut = session.getStdout();
- stdErr = session.getStderr();
+ InputStream stdOut = session.getStdout();
+ session.getStderr();
synchronized (lock) {
stdIn = session.getStdin();
- ByteBuf message = null;
- while ((message = postponned.poll()) != null) {
+ ByteBuf message;
+ while ((message = postponed.poll()) != null) {
writeImpl(message);
}
}
while (stopRequested.get() == false) {
- byte[] readBuff = new byte[1024];
+ byte[] readBuff = new byte[BUFFER_SIZE];
int c = stdOut.read(readBuff);
if (c == -1) {
continue;
sshClient.close();
synchronized (lock) {
- if (disconnectPromise != null)
+ if (disconnectPromise != null) {
ctx.disconnect(disconnectPromise);
+ }
}
}
}
public void write(ByteBuf message) throws IOException {
synchronized (lock) {
if (stdIn == null) {
- postponned.add(message);
+ postponed.add(message);
return;
}
writeImpl(message);
}
public void start(ChannelHandlerContext ctx) {
- if (this.ctx != null)
- return; // context is already associated.
+ if (this.ctx != null) {
+ // context is already associated.
+ return;
+ }
this.ctx = ctx;
new Thread(this).start();
}
import ch.ethz.ssh2.Session;
import ch.ethz.ssh2.StreamGobbler;
+import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
* Wrapper class for proprietary SSH sessions implementations
*/
-public class SshSession {
- final Session session;
+public class SshSession implements Closeable {
+ private final Session session;
public SshSession(Session session) {
this.session = session;
return session.waitUntilDataAvailable(timeout);
}
- public int waitForCondition(int condition_set, long timeout) {
- return session.waitForCondition(condition_set, timeout);
+ public int waitForCondition(int conditionSet, long timeout) {
+ return session.waitForCondition(conditionSet, timeout);
}
public Integer getExitStatus() {
public String getExitSignal() {
return session.getExitSignal();
}
+
+ @Override
+ public void close() {
+ session.close();
+ }
}
b[off] = (byte)c;
- if(this.bb.readableBytes() == 0) return bytesRead;
+ if(this.bb.readableBytes() == 0) {
+ return bytesRead;
+ }
int ltr = len-1;
ltr = (ltr <= bb.readableBytes()) ? ltr : bb.readableBytes();
}
}
- public void channelRegistered(ChannelHandlerContext ctx)
- throws Exception {
+ public void channelRegistered(ChannelHandlerContext ctx) {
ctx.fireChannelRegistered();
}
- public void channelUnregistered(ChannelHandlerContext ctx)
- throws Exception {
+ public void channelUnregistered(ChannelHandlerContext ctx) {
ctx.fireChannelUnregistered();
}
- public void channelActive(ChannelHandlerContext ctx)
- throws Exception {
+ public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
}
- public void channelInactive(ChannelHandlerContext ctx)
- throws Exception {
+ public void channelInactive(ChannelHandlerContext ctx) {
ctx.fireChannelInactive();
}
- public void channelRead(ChannelHandlerContext ctx, Object o)
- throws Exception {
+ public void channelRead(ChannelHandlerContext ctx, Object o) {
synchronized(lock) {
this.bb.discardReadBytes();
this.bb.writeBytes((ByteBuf) o);
}
}
- public void channelReadComplete(ChannelHandlerContext ctx)
- throws Exception {
+ public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.fireChannelReadComplete();
}
- public void userEventTriggered(ChannelHandlerContext ctx, Object o)
- throws Exception {
+ public void userEventTriggered(ChannelHandlerContext ctx, Object o) {
ctx.fireUserEventTriggered(o);
}
- public void channelWritabilityChanged(ChannelHandlerContext ctx)
- throws Exception {
+ public void channelWritabilityChanged(ChannelHandlerContext ctx) {
ctx.fireChannelWritabilityChanged();
}
- public void handlerAdded(ChannelHandlerContext ctx)
- throws Exception {
+ public void handlerAdded(ChannelHandlerContext ctx) {
}
- public void handlerRemoved(ChannelHandlerContext ctx)
- throws Exception {
+ public void handlerRemoved(ChannelHandlerContext ctx) {
}
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable)
- throws Exception {
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
ctx.fireExceptionCaught(throwable);
}
}
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPromise;
-import java.io.IOException;
import java.io.OutputStream;
import java.net.SocketAddress;
private ChannelHandlerContext ctx;
@Override
- public void flush() throws IOException {
+ public void flush() {
synchronized(lock) {
ctx.writeAndFlush(buff).awaitUninterruptibly();
buff = Unpooled.buffer();
}
@Override
- public void write(int b) throws IOException {
+ public void write(int b) {
synchronized(lock) {
buff.writeByte(b);
}
}
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
- ChannelPromise promise) throws Exception {
+ ChannelPromise promise) {
ctx.bind(localAddress, promise);
}
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
- SocketAddress localAddress, ChannelPromise promise)
- throws Exception {
+ SocketAddress localAddress, ChannelPromise promise) {
this.ctx = ctx;
ctx.connect(remoteAddress, localAddress, promise);
}
- public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
- throws Exception {
+ public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
ctx.disconnect(promise);
}
- public void close(ChannelHandlerContext ctx, ChannelPromise promise)
- throws Exception {
+ public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
ctx.close(promise);
}
- public void deregister(ChannelHandlerContext ctx, ChannelPromise channelPromise)
- throws Exception {
+ public void deregister(ChannelHandlerContext ctx, ChannelPromise channelPromise) {
ctx.deregister(channelPromise);
}
- public void read(ChannelHandlerContext ctx)
- throws Exception {
+ public void read(ChannelHandlerContext ctx) {
ctx.read();
}
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
- throws Exception {
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
// pass
}
- public void flush(ChannelHandlerContext ctx)
- throws Exception {
+ public void flush(ChannelHandlerContext ctx) {
// pass
}
throws Exception {
}
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.fireExceptionCaught(cause);
}
}
* are able to use full potential of NIO environment.
*/
public class VirtualSocket extends Socket implements ChannelHandler {
+ private static final String INPUT_STREAM = "inputStream";
+ private static final String OUTPUT_STREAM = "outputStream";
+
private final ChannelInputStream chis = new ChannelInputStream();
private final ChannelOutputStream chos = new ChannelOutputStream();
private ChannelHandlerContext ctx;
return this.chos;
}
- public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ public void handlerAdded(ChannelHandlerContext ctx) {
this.ctx = ctx;
- if (ctx.channel().pipeline().get("outputStream") == null) {
- ctx.channel().pipeline().addFirst("outputStream", chos);
+ if (ctx.channel().pipeline().get(OUTPUT_STREAM) == null) {
+ ctx.channel().pipeline().addFirst(OUTPUT_STREAM, chos);
}
- if (ctx.channel().pipeline().get("inputStream") == null) {
- ctx.channel().pipeline().addFirst("inputStream", chis);
+ if (ctx.channel().pipeline().get(INPUT_STREAM) == null) {
+ ctx.channel().pipeline().addFirst(INPUT_STREAM, chis);
}
}
- public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
- if (ctx.channel().pipeline().get("outputStream") != null) {
- ctx.channel().pipeline().remove("outputStream");
+ public void handlerRemoved(ChannelHandlerContext ctx) {
+ if (ctx.channel().pipeline().get(OUTPUT_STREAM) != null) {
+ ctx.channel().pipeline().remove(OUTPUT_STREAM);
}
- if (ctx.channel().pipeline().get("inputStream") != null) {
- ctx.channel().pipeline().remove("inputStream");
+ if (ctx.channel().pipeline().get(INPUT_STREAM) != null) {
+ ctx.channel().pipeline().remove(INPUT_STREAM);
}
}
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) throws Exception {
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
+ // TODO exceptionCaught is deprecated transform this handler
ctx.fireExceptionCaught(throwable);
}
public InetAddress getInetAddress() {
InetSocketAddress isa = getInetSocketAddress();
- if (isa == null) throw new VirtualSocketException();
+ if (isa == null) {
+ throw new VirtualSocketException();
+ }
return getInetSocketAddress().getAddress();
}
* session's connection. Provided information can be reported via netconf
* monitoring.
* <pre>
- * It has pattern "[username; host-address:port; transport; session-identifier;]"
+ * It has PATTERN "[username; host-address:port; transport; session-identifier;]"
* username - name of account on a remote
* host-address - client's IP address
* port - port number
}
// TODO IPv6
- private static final Pattern pattern = Pattern
+ private static final Pattern PATTERN = Pattern
.compile("\\[(?<username>[^;]+);(?<address>[0-9\\.]+)[:/](?<port>[0-9]+);(?<transport>[a-z]+)[^\\]]+\\]");
- private static final Pattern customHeaderPattern = Pattern
+ private static final Pattern CUSTOM_HEADER_PATTERN = Pattern
.compile("\\[(?<username>[^;]+);(?<address>[0-9\\.]+)[:/](?<port>[0-9]+);(?<transport>[a-z]+);(?<sessionIdentifier>[a-z]+)[^\\]]+\\]");
/**
* Parse additional header from a formatted string
*/
public static NetconfHelloMessageAdditionalHeader fromString(String additionalHeader) {
- additionalHeader = additionalHeader.trim();
- Matcher matcher = pattern.matcher(additionalHeader);
- Matcher matcher2 = customHeaderPattern.matcher(additionalHeader);
+ String additionalHeaderTrimmed = additionalHeader.trim();
+ Matcher matcher = PATTERN.matcher(additionalHeaderTrimmed);
+ Matcher matcher2 = CUSTOM_HEADER_PATTERN.matcher(additionalHeaderTrimmed);
Preconditions.checkArgument(matcher.matches(), "Additional header in wrong format %s, expected %s",
- additionalHeader, pattern);
+ additionalHeaderTrimmed, PATTERN);
String username = matcher.group("username");
String address = matcher.group("address");
import com.google.common.base.Charsets;
public class NetconfMessageConstants {
+ /**
+ * The NETCONF 1.0 old-style message separator. This is framing mechanism
+ * is used by default.
+ */
+ public static final byte[] END_OF_MESSAGE = "]]>]]>".getBytes(Charsets.UTF_8);
- public static final byte[] endOfMessage = "]]>]]>".getBytes(Charsets.UTF_8);
+ // bytes
- public static final byte[] endOfChunk = "\n##\n".getBytes(Charsets.UTF_8);
+ public static final int MIN_HEADER_LENGTH = 4;
- public static final int MAX_CHUNK_SIZE = 1024; // bytes
+ // bytes
- public static final int MIN_HEADER_LENGTH = 4; // bytes
+ public static final int MAX_HEADER_LENGTH = 13;
- public static final int MAX_HEADER_LENGTH = 13; // bytes
-}
\ No newline at end of file
+ public static final byte[] END_OF_CHUNK = "\n##\n".getBytes(Charsets.UTF_8);
+}
package org.opendaylight.controller.netconf.util.messages;
+import java.nio.ByteBuffer;
+
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
-import java.nio.ByteBuffer;
-
/**
* Netconf message header is used only when chunked framing mechanism is
* supported. The header consists of only the length field.
*/
+@Deprecated
public final class NetconfMessageHeader {
-
- private long length;
-
// \n#<length>\n
- private static final byte[] headerBegin = new byte[] { (byte) 0x0a, (byte) 0x23 };
+ private static final byte[] HEADER_START = new byte[] { (byte) 0x0a, (byte) 0x23 };
+ private static final byte HEADER_END = (byte) 0x0a;
+ private final long length;
- private static final byte headerEnd = (byte) 0x0a;
-
- private boolean parsed = false;
-
- public NetconfMessageHeader() {
-
- }
-
- public NetconfMessageHeader fromBytes(final byte[] bytes) {
- // the length is variable therefore bytes between headerBegin and
- // headerEnd mark the length
- // the length should be only numbers and therefore easily parsed with
- // ASCII
- this.length = Long.parseLong(Charsets.US_ASCII.decode(
- ByteBuffer.wrap(bytes, headerBegin.length, bytes.length - headerBegin.length - 1)).toString());
- Preconditions.checkState(this.length < Integer.MAX_VALUE && this.length > 0);
- this.parsed = true;
- return this;
+ public NetconfMessageHeader(final long length) {
+ Preconditions.checkArgument(length < Integer.MAX_VALUE && length > 0);
+ this.length = length;
}
public byte[] toBytes() {
- final byte[] l = String.valueOf(this.length).getBytes(Charsets.US_ASCII);
- final byte[] h = new byte[headerBegin.length + l.length + 1];
- System.arraycopy(headerBegin, 0, h, 0, headerBegin.length);
- System.arraycopy(l, 0, h, headerBegin.length, l.length);
- System.arraycopy(new byte[] { headerEnd }, 0, h, headerBegin.length + l.length, 1);
- return h;
+ return toBytes(this.length);
}
// FIXME: improve precision to long
return (int) this.length;
}
- public void setLength(final int length) {
- this.length = length;
- }
+ public static NetconfMessageHeader fromBytes(final byte[] bytes) {
+ // the length is variable therefore bytes between headerBegin and
+ // headerEnd mark the length
+ // the length should be only numbers and therefore easily parsed with
+ // ASCII
+ long length = Long.parseLong(Charsets.US_ASCII.decode(
+ ByteBuffer.wrap(bytes, HEADER_START.length, bytes.length - HEADER_START.length - 1)).toString());
- /**
- * @return the parsed
- */
- public boolean isParsed() {
- return this.parsed;
+ return new NetconfMessageHeader(length);
}
- /**
- * @param parsed
- * the parsed to set
- */
- public void setParsed() {
- this.parsed = false;
+ public static byte[] toBytes(final long length) {
+ final byte[] l = String.valueOf(length).getBytes(Charsets.US_ASCII);
+ final byte[] h = new byte[HEADER_START.length + l.length + 1];
+ System.arraycopy(HEADER_START, 0, h, 0, HEADER_START.length);
+ System.arraycopy(l, 0, h, HEADER_START.length, l.length);
+ System.arraycopy(new byte[] { HEADER_END }, 0, h, HEADER_START.length + l.length, 1);
+ return h;
}
}
import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
import org.w3c.dom.Document;
-public class NetconfMessageUtil {
+public final class NetconfMessageUtil {
+
+ private NetconfMessageUtil() {}
public static boolean isOKMessage(NetconfMessage message) {
return isOKMessage(message.getDocument());
import java.io.InputStream;
import java.util.Map.Entry;
-public class SendErrorExceptionUtil {
+public final class SendErrorExceptionUtil {
private static final Logger logger = LoggerFactory.getLogger(SendErrorExceptionUtil.class);
+ private SendErrorExceptionUtil() {}
+
public static void sendErrorMessage(final NetconfSession session,
final NetconfDocumentedException sendErrorException) {
logger.trace("Sending error {}", sendErrorException.getMessage(), sendErrorException);
for (int i = 0; i < incomingAttributes.getLength(); i++) {
final Attr attr = (Attr) incomingAttributes.item(i);
// skip namespace
- if (attr.getNodeName().equals(XmlUtil.XMLNS_ATTRIBUTE_KEY))
+ if (attr.getNodeName().equals(XmlUtil.XMLNS_ATTRIBUTE_KEY)) {
continue;
+ }
rpcReply.setAttributeNode((Attr) errorDocument.importNode(attr, true));
}
} catch (final Exception e) {
import static com.google.common.base.Preconditions.checkNotNull;
-public class NetconfConfigUtil {
+public final class NetconfConfigUtil {
private static final Logger logger = LoggerFactory.getLogger(NetconfConfigUtil.class);
private static final String PREFIX_PROP = "netconf.";
-
+ private NetconfConfigUtil() {}
private enum InfixProp {
tcp, ssh
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
-public class XMLNetconfUtil {
+public final class XMLNetconfUtil {
+
+ private XMLNetconfUtil() {}
public static XPathExpression compileXPath(String xPath) {
final XPathFactory xPathfactory = XPathFactory.newInstance();
import java.util.List;
import java.util.Map;
-public class XmlElement {
+public final class XmlElement {
- public final Element element;
+ private final Element element;
private XmlElement(Element element) {
this.element = element;
public void appendChild(Element element) {
this.element.appendChild(element);
- // Element newElement = (Element) element.cloneNode(true);
- // newElement.appendChild(configElement);
- // return XmlElement.fromDomElement(newElement);
}
public Element getDomElement() {
final List<XmlElement> result = new ArrayList<>();
for (int i = 0; i < childNodes.getLength(); i++) {
Node item = childNodes.item(i);
- if (item instanceof Element == false)
+ if (item instanceof Element == false) {
continue;
- if (strat.accept((Element) item))
+ }
+ if (strat.accept((Element) item)) {
result.add(new XmlElement((Element) item));
+ }
}
return result;
public String getNamespace() {
String namespaceURI = element.getNamespaceURI();
Preconditions.checkState(namespaceURI != null, "No namespace defined for %s", this);
- return namespaceURI.toString();
+ return namespaceURI;
}
@Override
public String toString() {
- final StringBuffer sb = new StringBuffer("XmlElement{");
+ final StringBuilder sb = new StringBuilder("XmlElement{");
sb.append("name='").append(getName()).append('\'');
if (element.getNamespaceURI() != null) {
sb.append(", namespace='").append(getNamespace()).append('\'');
public Map.Entry<String/* prefix */, String/* namespace */> findNamespaceOfTextContent() {
Map<String, String> namespaces = extractNamespaces(element);
String textContent = getTextContent();
- int indexOfColon = textContent.indexOf(":");
+ int indexOfColon = textContent.indexOf(':');
String prefix;
if (indexOfColon > -1) {
prefix = textContent.substring(0, indexOfColon);
@Override
public boolean equals(Object o) {
- if (this == o)
+ if (this == o) {
return true;
- if (o == null || getClass() != o.getClass())
+ }
+ if (o == null || getClass() != o.getClass()) {
return false;
+ }
XmlElement that = (XmlElement) o;
- if (!element.isEqualNode(that.element))
+ if (!element.isEqualNode(that.element)) {
return false;
+ }
return true;
}
return true;
}
- private static interface ElementFilteringStrategy {
+ private interface ElementFilteringStrategy {
boolean accept(Element e);
}
}
*/
package org.opendaylight.controller.netconf.util.xml;
-public class XmlNetconfConstants {
+public final class XmlNetconfConstants {
+
+ private XmlNetconfConstants() {}
public static final String MOUNTPOINTS = "mountpoints";
public static final String MOUNTPOINT = "mountpoint";
import com.google.common.base.Preconditions;
-public class XmlNetconfValidator {
- static final Schema schema;
+public final class XmlNetconfValidator {
+
+ private static final Schema SCHEMA;
+
+ private XmlNetconfValidator() {}
static {
final InputStream xmlSchema = XmlNetconfValidator.class.getResourceAsStream("/xml.xsd");
final InputStream rfc4714Schema = XmlNetconfValidator.class.getResourceAsStream("/rfc4741.xsd");
Preconditions.checkNotNull(rfc4714Schema, "Cannot find rfc4741.xsd");
- schema = XmlUtil.loadSchema(xmlSchema, rfc4714Schema);
+ SCHEMA = XmlUtil.loadSchema(xmlSchema, rfc4714Schema);
}
public static void validate(Document inputDocument) throws SAXException, IOException {
- final Validator validator = schema.newValidator();
+ final Validator validator = SCHEMA.newValidator();
final Source source = new DOMSource(inputDocument);
validator.validate(source);
}
import com.google.common.base.Charsets;
-public class XmlUtil {
+public final class XmlUtil {
public static final String XMLNS_ATTRIBUTE_KEY = "xmlns";
+ private static final DocumentBuilderFactory BUILDERFACTORY;
+
+ static {
+ DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ factory.setNamespaceAware(true);
+ factory.setCoalescing(true);
+ factory.setIgnoringElementContentWhitespace(true);
+ factory.setIgnoringComments(true);
+ BUILDERFACTORY = factory;
+ }
+
+ private XmlUtil() {}
public static Element readXmlToElement(String xmlContent) throws SAXException, IOException {
Document doc = readXmlToDocument(xmlContent);
return readXmlToDocument(new ByteArrayInputStream(xmlContent.getBytes(Charsets.UTF_8)));
}
+ // TODO improve exceptions throwing
+ // along with XmlElement
+
public static Document readXmlToDocument(InputStream xmlContent) throws SAXException, IOException {
- DocumentBuilderFactory factory = getDocumentBuilderFactory();
DocumentBuilder dBuilder;
try {
- dBuilder = factory.newDocumentBuilder();
+ dBuilder = BUILDERFACTORY.newDocumentBuilder();
} catch (ParserConfigurationException e) {
- throw new RuntimeException(e);
+ throw new RuntimeException("Failed to parse XML document", e);
}
Document doc = dBuilder.parse(xmlContent);
return readXmlToDocument(new FileInputStream(xmlFile)).getDocumentElement();
}
- private static final DocumentBuilderFactory getDocumentBuilderFactory() {
- DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
- factory.setNamespaceAware(true);
- factory.setCoalescing(true);
- factory.setIgnoringElementContentWhitespace(true);
- factory.setIgnoringComments(true);
- return factory;
- }
-
public static Document newDocument() {
- DocumentBuilderFactory factory = getDocumentBuilderFactory();
try {
- DocumentBuilder builder = factory.newDocumentBuilder();
+ DocumentBuilder builder = BUILDERFACTORY.newDocumentBuilder();
Document document = builder.newDocument();
return document;
} catch (ParserConfigurationException e) {
- throw new RuntimeException(e);
+ throw new RuntimeException("Failed to create document", e);
}
}
try {
Transformer transformer = TransformerFactory.newInstance().newTransformer();
transformer.setOutputProperty(OutputKeys.INDENT, "yes");
- transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, addXmlDeclaration == true ? "no" : "yes");
+ transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, addXmlDeclaration ? "no" : "yes");
StreamResult result = new StreamResult(new StringWriter());
DOMSource source = new DOMSource(xml);
transformer.transform(source, result);
- String xmlString = result.getWriter().toString();
- return xmlString;
+ return result.getWriter().toString();
} catch (IllegalArgumentException | TransformerFactoryConfigurationError | TransformerException e) {
throw new RuntimeException("Unable to serialize xml element " + xml, e);
}
import org.junit.Test;
import org.opendaylight.controller.netconf.util.handler.NetconfXMLToHelloMessageDecoder;
-import org.opendaylight.controller.netconf.util.handler.NetconfXMLToMessageDecoder;
import com.google.common.io.Files;
public class NetconfMessageFactoryTest {
@Test
public void testAuth() throws Exception {
- NetconfXMLToMessageDecoder parser = new NetconfXMLToHelloMessageDecoder();
+ NetconfXMLToHelloMessageDecoder parser = new NetconfXMLToHelloMessageDecoder();
File authHelloFile = new File(getClass().getResource("/netconfMessages/client_hello_with_auth.xml").getFile());
final List<Object> out = new ArrayList<>();
<version>1.1-SNAPSHOT</version>
<packaging>bundle</packaging>
- <properties>
- <ganymed.version>build209</ganymed.version>
- </properties>
-
<dependencies>
<dependency>
<groupId>org.osgi</groupId>