2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.nettyutil.handler;
10 import static com.google.common.base.Preconditions.checkArgument;
12 import io.netty.buffer.ByteBuf;
13 import io.netty.buffer.CompositeByteBuf;
14 import io.netty.buffer.Unpooled;
15 import io.netty.channel.ChannelHandlerContext;
16 import io.netty.handler.codec.ByteToMessageDecoder;
17 import java.util.List;
18 import org.checkerframework.checker.index.qual.NonNegative;
19 import org.opendaylight.netconf.nettyutil.AbstractNetconfSessionNegotiator;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
23 public class NetconfChunkAggregator extends ByteToMessageDecoder {
24 private static final Logger LOG = LoggerFactory.getLogger(NetconfChunkAggregator.class);
25 private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM = "Got byte {} while waiting for {}";
26 private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM = "Got byte {} while waiting for {}-{}";
27 private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM_PARAM =
28 "Got byte {} while waiting for {}-{}-{}";
30 @Deprecated(since = "4.0.1", forRemoval = true)
31 public static final @NonNegative int DEFAULT_MAXIMUM_CHUNK_SIZE =
32 AbstractNetconfSessionNegotiator.DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE;
37 HEADER_LENGTH_FIRST, // [1-9]
38 HEADER_LENGTH_OTHER, // [0-9]*\n
46 private final int maxChunkSize;
47 private State state = State.HEADER_ONE;
48 private long chunkSize;
49 private CompositeByteBuf chunk;
52 * Construct an instance with maximum chunk size set to {@link #DEFAULT_MAXIMUM_CHUNK_SIZE}.
54 * @deprecated Prefer {@link #NetconfChunkAggregator(int)} for fine-grained control.
56 @Deprecated(since = "4.0.1", forRemoval = true)
57 public NetconfChunkAggregator() {
58 this(DEFAULT_MAXIMUM_CHUNK_SIZE);
62 * Construct an instance with specified maximum chunk size.
64 * @param maxChunkSize maximum chunk size
65 * @throws IllegalArgumentException if {@code maxChunkSize} is negative
67 public NetconfChunkAggregator(final @NonNegative int maxChunkSize) {
68 this.maxChunkSize = maxChunkSize;
69 checkArgument(maxChunkSize > 0, "Negative maximum chunk size %s", maxChunkSize);
72 private static void checkNewLine(final byte byteToCheck, final String errorMessage) {
73 if (byteToCheck != '\n') {
74 LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM, byteToCheck, (byte)'\n');
75 throw new IllegalStateException(errorMessage);
79 private static void checkHash(final byte byteToCheck, final String errorMessage) {
80 if (byteToCheck != '#') {
81 LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM, byteToCheck, (byte)'#');
82 throw new IllegalStateException(errorMessage);
86 private void checkChunkSize() {
87 if (chunkSize > maxChunkSize) {
88 LOG.debug("Parsed chunk size {}, maximum allowed is {}", chunkSize, maxChunkSize);
89 throw new IllegalStateException("Chunk size " + chunkSize + " exceeds maximum " + maxChunkSize);
94 protected void decode(final ChannelHandlerContext ctx,
95 final ByteBuf in, final List<Object> out) throws IllegalStateException {
96 while (in.isReadable()) {
99 final byte b = in.readByte();
100 checkNewLine(b, "Malformed chunk header encountered (byte 0)");
101 state = State.HEADER_TWO;
106 final byte b = in.readByte();
107 checkHash(b, "Malformed chunk header encountered (byte 1)");
108 state = State.HEADER_LENGTH_FIRST;
111 case HEADER_LENGTH_FIRST: {
112 final byte b = in.readByte();
113 chunkSize = processHeaderLengthFirst(b);
114 state = State.HEADER_LENGTH_OTHER;
117 case HEADER_LENGTH_OTHER: {
118 final byte b = in.readByte();
123 if (b < '0' || b > '9') {
124 LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, b, (byte)'0', (byte)'9');
125 throw new IllegalStateException("Invalid chunk size encountered");
128 chunkSize += b - '0';
134 * FIXME: this gathers all data into one big chunk before passing
135 * it on. Make sure the pipeline can work with partial data
136 * and then change this piece to pass the data on as it
139 if (in.readableBytes() < chunkSize) {
140 LOG.debug("Buffer has {} bytes, need {} to complete chunk", in.readableBytes(), chunkSize);
141 in.discardReadBytes();
144 aggregateChunks(in.readBytes((int) chunkSize));
145 state = State.FOOTER_ONE;
148 final byte b = in.readByte();
149 checkNewLine(b,"Malformed chunk footer encountered (byte 0)");
150 state = State.FOOTER_TWO;
155 final byte b = in.readByte();
156 checkHash(b,"Malformed chunk footer encountered (byte 1)");
157 state = State.FOOTER_THREE;
161 final byte b = in.readByte();
162 // In this state, either header-of-new-chunk or message-end is expected
163 // Depends on the next character
164 extractNewChunkOrMessageEnd(b);
168 final byte b = in.readByte();
169 checkNewLine(b,"Malformed chunk footer encountered (byte 3)");
170 state = State.HEADER_ONE;
176 LOG.info("Unknown state.");
180 in.discardReadBytes();
183 private void extractNewChunkOrMessageEnd(final byte byteToCheck) {
184 if (isHeaderLengthFirst(byteToCheck)) {
185 // Extract header length#1 from new chunk
186 chunkSize = processHeaderLengthFirst(byteToCheck);
187 // Proceed with next chunk processing
188 state = State.HEADER_LENGTH_OTHER;
189 } else if (byteToCheck == '#') {
190 state = State.FOOTER_FOUR;
192 LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM_PARAM, byteToCheck, (byte) '#', (byte) '1', (byte) '9');
193 throw new IllegalStateException("Malformed chunk footer encountered (byte 2)");
197 private void initChunk() {
198 chunk = Unpooled.compositeBuffer();
201 private void aggregateChunks(final ByteBuf newChunk) {
202 chunk.addComponent(chunk.numComponents(), newChunk);
204 // Update writer index, addComponent does not update it
205 chunk.writerIndex(chunk.writerIndex() + newChunk.readableBytes());
208 private static int processHeaderLengthFirst(final byte byteToCheck) {
209 if (!isHeaderLengthFirst(byteToCheck)) {
210 LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, byteToCheck, (byte)'1', (byte)'9');
211 throw new IllegalStateException("Invalid chunk size encountered (byte 0)");
214 return byteToCheck - '0';
217 private static boolean isHeaderLengthFirst(final byte byteToCheck) {
218 return byteToCheck >= '1' && byteToCheck <= '9';