2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.nettyutil.handler;
10 import static com.google.common.base.Preconditions.checkArgument;
12 import io.netty.buffer.ByteBuf;
13 import io.netty.buffer.CompositeByteBuf;
14 import io.netty.buffer.Unpooled;
15 import io.netty.channel.ChannelHandlerContext;
16 import io.netty.handler.codec.ByteToMessageDecoder;
17 import java.util.List;
18 import org.checkerframework.checker.index.qual.NonNegative;
19 import org.opendaylight.netconf.nettyutil.AbstractNetconfSessionNegotiator;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
23 public class NetconfChunkAggregator extends ByteToMessageDecoder {
24 private static final Logger LOG = LoggerFactory.getLogger(NetconfChunkAggregator.class);
25 private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM = "Got byte {} while waiting for {}";
26 private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM = "Got byte {} while waiting for {}-{}";
27 private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM_PARAM =
28 "Got byte {} while waiting for {}-{}-{}";
30 public static final @NonNegative int DEFAULT_MAXIMUM_CHUNK_SIZE =
31 AbstractNetconfSessionNegotiator.DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE;
36 HEADER_LENGTH_FIRST, // [1-9]
37 HEADER_LENGTH_OTHER, // [0-9]*\n
45 private final int maxChunkSize;
46 private State state = State.HEADER_ONE;
47 private long chunkSize;
48 private CompositeByteBuf chunk;
51 * Construct an instance with maximum chunk size set to {@link #DEFAULT_MAXIMUM_CHUNK_SIZE}.
53 public NetconfChunkAggregator() {
54 this(DEFAULT_MAXIMUM_CHUNK_SIZE);
58 * Construct an instance with specified maximum chunk size.
60 * @param maxChunkSize maximum chunk size
61 * @throws IllegalArgumentException if {@code maxChunkSize} is negative
63 public NetconfChunkAggregator(final @NonNegative int maxChunkSize) {
64 this.maxChunkSize = maxChunkSize;
65 checkArgument(maxChunkSize > 0, "Negative maximum chunk size %s", maxChunkSize);
68 private static void checkNewLine(final byte byteToCheck, final String errorMessage) {
69 if (byteToCheck != '\n') {
70 LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM, byteToCheck, (byte)'\n');
71 throw new IllegalStateException(errorMessage);
75 private static void checkHash(final byte byteToCheck, final String errorMessage) {
76 if (byteToCheck != '#') {
77 LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM, byteToCheck, (byte)'#');
78 throw new IllegalStateException(errorMessage);
82 private void checkChunkSize() {
83 if (chunkSize > maxChunkSize) {
84 LOG.debug("Parsed chunk size {}, maximum allowed is {}", chunkSize, maxChunkSize);
85 throw new IllegalStateException("Chunk size " + chunkSize + " exceeds maximum " + maxChunkSize);
90 protected void decode(final ChannelHandlerContext ctx,
91 final ByteBuf in, final List<Object> out) throws IllegalStateException {
92 while (in.isReadable()) {
95 final byte b = in.readByte();
96 checkNewLine(b, "Malformed chunk header encountered (byte 0)");
97 state = State.HEADER_TWO;
102 final byte b = in.readByte();
103 checkHash(b, "Malformed chunk header encountered (byte 1)");
104 state = State.HEADER_LENGTH_FIRST;
107 case HEADER_LENGTH_FIRST: {
108 final byte b = in.readByte();
109 chunkSize = processHeaderLengthFirst(b);
110 state = State.HEADER_LENGTH_OTHER;
113 case HEADER_LENGTH_OTHER: {
114 final byte b = in.readByte();
119 if (b < '0' || b > '9') {
120 LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, b, (byte)'0', (byte)'9');
121 throw new IllegalStateException("Invalid chunk size encountered");
124 chunkSize += b - '0';
130 * FIXME: this gathers all data into one big chunk before passing
131 * it on. Make sure the pipeline can work with partial data
132 * and then change this piece to pass the data on as it
135 if (in.readableBytes() < chunkSize) {
136 LOG.debug("Buffer has {} bytes, need {} to complete chunk", in.readableBytes(), chunkSize);
137 in.discardReadBytes();
140 aggregateChunks(in.readBytes((int) chunkSize));
141 state = State.FOOTER_ONE;
144 final byte b = in.readByte();
145 checkNewLine(b,"Malformed chunk footer encountered (byte 0)");
146 state = State.FOOTER_TWO;
151 final byte b = in.readByte();
152 checkHash(b,"Malformed chunk footer encountered (byte 1)");
153 state = State.FOOTER_THREE;
157 final byte b = in.readByte();
158 // In this state, either header-of-new-chunk or message-end is expected
159 // Depends on the next character
160 extractNewChunkOrMessageEnd(b);
164 final byte b = in.readByte();
165 checkNewLine(b,"Malformed chunk footer encountered (byte 3)");
166 state = State.HEADER_ONE;
172 LOG.info("Unknown state.");
176 in.discardReadBytes();
179 private void extractNewChunkOrMessageEnd(final byte byteToCheck) {
180 if (isHeaderLengthFirst(byteToCheck)) {
181 // Extract header length#1 from new chunk
182 chunkSize = processHeaderLengthFirst(byteToCheck);
183 // Proceed with next chunk processing
184 state = State.HEADER_LENGTH_OTHER;
185 } else if (byteToCheck == '#') {
186 state = State.FOOTER_FOUR;
188 LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM_PARAM, byteToCheck, (byte) '#', (byte) '1', (byte) '9');
189 throw new IllegalStateException("Malformed chunk footer encountered (byte 2)");
193 private void initChunk() {
194 chunk = Unpooled.compositeBuffer();
197 private void aggregateChunks(final ByteBuf newChunk) {
198 chunk.addComponent(chunk.numComponents(), newChunk);
200 // Update writer index, addComponent does not update it
201 chunk.writerIndex(chunk.writerIndex() + newChunk.readableBytes());
204 private static int processHeaderLengthFirst(final byte byteToCheck) {
205 if (!isHeaderLengthFirst(byteToCheck)) {
206 LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, byteToCheck, (byte)'1', (byte)'9');
207 throw new IllegalStateException("Invalid chunk size encountered (byte 0)");
210 return byteToCheck - '0';
213 private static boolean isHeaderLengthFirst(final byte byteToCheck) {
214 return byteToCheck >= '1' && byteToCheck <= '9';