2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.netconf.util.handler;
11 import java.util.List;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
16 import io.netty.buffer.ByteBuf;
17 import io.netty.buffer.CompositeByteBuf;
18 import io.netty.buffer.Unpooled;
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.handler.codec.ByteToMessageDecoder;
22 public class NetconfChunkAggregator extends ByteToMessageDecoder {
23 private final static Logger logger = LoggerFactory.getLogger(NetconfChunkAggregator.class);
24 public static final int DEFAULT_MAXIMUM_CHUNK_SIZE = 16 * 1024 * 1024;
26 private static enum State {
29 HEADER_LENGTH_FIRST, // [1-9]
30 HEADER_LENGTH_OTHER, // [0-9]*\n
38 private final int maxChunkSize = DEFAULT_MAXIMUM_CHUNK_SIZE;
39 private State state = State.HEADER_ONE;
40 private long chunkSize;
41 private CompositeByteBuf chunk;
44 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
45 while (in.isReadable()) {
49 final byte b = in.readByte();
51 logger.debug("Got byte {} while waiting for {}", b, (byte)'\n');
52 throw new IllegalStateException("Malformed chunk header encountered (byte 0)");
55 state = State.HEADER_TWO;
62 final byte b = in.readByte();
64 logger.debug("Got byte {} while waiting for {}", b, (byte)'#');
65 throw new IllegalStateException("Malformed chunk header encountered (byte 1)");
68 state = State.HEADER_LENGTH_FIRST;
71 case HEADER_LENGTH_FIRST:
73 final byte b = in.readByte();
74 chunkSize = processHeaderLengthFirst(b);
75 state = State.HEADER_LENGTH_OTHER;
78 case HEADER_LENGTH_OTHER:
80 final byte b = in.readByte();
86 if (b < '0' || b > '9') {
87 logger.debug("Got byte {} while waiting for {}-{}", b, (byte)'0', (byte)'9');
88 throw new IllegalStateException("Invalid chunk size encountered");
94 if (chunkSize > maxChunkSize) {
95 logger.debug("Parsed chunk size {}, maximum allowed is {}", chunkSize, maxChunkSize);
96 throw new IllegalStateException("Maximum chunk size exceeded");
102 * FIXME: this gathers all data into one big chunk before passing
103 * it on. Make sure the pipeline can work with partial data
104 * and then change this piece to pass the data on as it
107 if (in.readableBytes() < chunkSize) {
108 logger.debug("Buffer has {} bytes, need {} to complete chunk", in.readableBytes(), chunkSize);
109 in.discardReadBytes();
113 aggregateChunks(in.readBytes((int) chunkSize));
114 state = State.FOOTER_ONE;
118 final byte b = in.readByte();
120 logger.debug("Got byte {} while waiting for {}", b, (byte)'\n');
121 throw new IllegalStateException("Malformed chunk footer encountered (byte 0)");
124 state = State.FOOTER_TWO;
130 final byte b = in.readByte();
133 logger.debug("Got byte {} while waiting for {}", b, (byte)'#');
134 throw new IllegalStateException("Malformed chunk footer encountered (byte 1)");
137 state = State.FOOTER_THREE;
142 final byte b = in.readByte();
144 // In this state, either header-of-new-chunk or message-end is expected
145 // Depends on the next character
147 if (isHeaderLengthFirst(b)) {
148 // Extract header length#1 from new chunk
149 chunkSize = processHeaderLengthFirst(b);
150 // Proceed with next chunk processing
151 state = State.HEADER_LENGTH_OTHER;
152 } else if (b == '#') {
153 state = State.FOOTER_FOUR;
155 logger.debug("Got byte {} while waiting for {} or {}-{}", b, (byte) '#', (byte) '1', (byte) '9');
156 throw new IllegalStateException("Malformed chunk footer encountered (byte 2)");
163 final byte b = in.readByte();
165 logger.debug("Got byte {} while waiting for {}", b, (byte)'\n');
166 throw new IllegalStateException("Malformed chunk footer encountered (byte 3)");
169 state = State.HEADER_ONE;
177 in.discardReadBytes();
180 private void initChunk() {
181 chunk = Unpooled.compositeBuffer();
184 private void aggregateChunks(ByteBuf newChunk) {
185 chunk.addComponent(chunk.numComponents(), newChunk);
187 // Update writer index, addComponent does not update it
188 chunk.writerIndex(chunk.writerIndex() + newChunk.readableBytes());
191 private static int processHeaderLengthFirst(byte b) {
192 if (isHeaderLengthFirst(b) == false) {
193 logger.debug("Got byte {} while waiting for {}-{}", b, (byte)'1', (byte)'9');
194 throw new IllegalStateException("Invalid chunk size encountered (byte 0)");
200 private static boolean isHeaderLengthFirst(byte b) {
201 return b >= '1' && b <= '9';