Deprecate DEFAULT_MAXIMUM_CHUNK_SIZE
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / handler / NetconfChunkAggregator.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.nettyutil.handler;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11
12 import io.netty.buffer.ByteBuf;
13 import io.netty.buffer.CompositeByteBuf;
14 import io.netty.buffer.Unpooled;
15 import io.netty.channel.ChannelHandlerContext;
16 import io.netty.handler.codec.ByteToMessageDecoder;
17 import java.util.List;
18 import org.checkerframework.checker.index.qual.NonNegative;
19 import org.opendaylight.netconf.nettyutil.AbstractNetconfSessionNegotiator;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 public class NetconfChunkAggregator extends ByteToMessageDecoder {
24     private static final Logger LOG = LoggerFactory.getLogger(NetconfChunkAggregator.class);
25     private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM = "Got byte {} while waiting for {}";
26     private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM = "Got byte {} while waiting for {}-{}";
27     private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM_PARAM =
28         "Got byte {} while waiting for {}-{}-{}";
29
30     @Deprecated(since = "4.0.1", forRemoval = true)
31     public static final @NonNegative int DEFAULT_MAXIMUM_CHUNK_SIZE =
32         AbstractNetconfSessionNegotiator.DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE;
33
34     private enum State {
35         HEADER_ONE, // \n
36         HEADER_TWO, // #
37         HEADER_LENGTH_FIRST, // [1-9]
38         HEADER_LENGTH_OTHER, // [0-9]*\n
39         DATA,
40         FOOTER_ONE, // \n
41         FOOTER_TWO, // #
42         FOOTER_THREE, // #
43         FOOTER_FOUR, // \n
44     }
45
46     private final int maxChunkSize;
47     private State state = State.HEADER_ONE;
48     private long chunkSize;
49     private CompositeByteBuf chunk;
50
51     /**
52      * Construct an instance with maximum chunk size set to {@link #DEFAULT_MAXIMUM_CHUNK_SIZE}.
53      *
54      * @deprecated Prefer {@link #NetconfChunkAggregator(int)} for fine-grained control.
55      */
56     @Deprecated(since = "4.0.1", forRemoval = true)
57     public NetconfChunkAggregator() {
58         this(DEFAULT_MAXIMUM_CHUNK_SIZE);
59     }
60
61     /**
62      * Construct an instance with specified maximum chunk size.
63      *
64      * @param maxChunkSize maximum chunk size
65      * @throws IllegalArgumentException if {@code maxChunkSize} is negative
66      */
67     public NetconfChunkAggregator(final @NonNegative int maxChunkSize) {
68         this.maxChunkSize = maxChunkSize;
69         checkArgument(maxChunkSize > 0, "Negative maximum chunk size %s", maxChunkSize);
70     }
71
72     private static void checkNewLine(final byte byteToCheck, final String errorMessage) {
73         if (byteToCheck != '\n') {
74             LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM, byteToCheck, (byte)'\n');
75             throw new IllegalStateException(errorMessage);
76         }
77     }
78
79     private static void checkHash(final byte byteToCheck, final String errorMessage) {
80         if (byteToCheck != '#') {
81             LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM, byteToCheck, (byte)'#');
82             throw new IllegalStateException(errorMessage);
83         }
84     }
85
86     private void checkChunkSize() {
87         if (chunkSize > maxChunkSize) {
88             LOG.debug("Parsed chunk size {}, maximum allowed is {}", chunkSize, maxChunkSize);
89             throw new IllegalStateException("Chunk size " + chunkSize + " exceeds maximum " + maxChunkSize);
90         }
91     }
92
93     @Override
94     protected void decode(final ChannelHandlerContext ctx,
95                           final ByteBuf in, final List<Object> out) throws IllegalStateException {
96         while (in.isReadable()) {
97             switch (state) {
98                 case HEADER_ONE: {
99                     final byte b = in.readByte();
100                     checkNewLine(b, "Malformed chunk header encountered (byte 0)");
101                     state = State.HEADER_TWO;
102                     initChunk();
103                     break;
104                 }
105                 case HEADER_TWO: {
106                     final byte b = in.readByte();
107                     checkHash(b, "Malformed chunk header encountered (byte 1)");
108                     state = State.HEADER_LENGTH_FIRST;
109                     break;
110                 }
111                 case HEADER_LENGTH_FIRST: {
112                     final byte b = in.readByte();
113                     chunkSize = processHeaderLengthFirst(b);
114                     state = State.HEADER_LENGTH_OTHER;
115                     break;
116                 }
117                 case HEADER_LENGTH_OTHER: {
118                     final byte b = in.readByte();
119                     if (b == '\n') {
120                         state = State.DATA;
121                         break;
122                     }
123                     if (b < '0' || b > '9') {
124                         LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, b, (byte)'0', (byte)'9');
125                         throw new IllegalStateException("Invalid chunk size encountered");
126                     }
127                     chunkSize *= 10;
128                     chunkSize += b - '0';
129                     checkChunkSize();
130                     break;
131                 }
132                 case DATA:
133                     /*
134                      * FIXME: this gathers all data into one big chunk before passing
135                      *        it on. Make sure the pipeline can work with partial data
136                      *        and then change this piece to pass the data on as it
137                      *        comes through.
138                      */
139                     if (in.readableBytes() < chunkSize) {
140                         LOG.debug("Buffer has {} bytes, need {} to complete chunk", in.readableBytes(), chunkSize);
141                         in.discardReadBytes();
142                         return;
143                     }
144                     aggregateChunks(in.readBytes((int) chunkSize));
145                     state = State.FOOTER_ONE;
146                     break;
147                 case FOOTER_ONE: {
148                     final byte b = in.readByte();
149                     checkNewLine(b,"Malformed chunk footer encountered (byte 0)");
150                     state = State.FOOTER_TWO;
151                     chunkSize = 0;
152                     break;
153                 }
154                 case FOOTER_TWO: {
155                     final byte b = in.readByte();
156                     checkHash(b,"Malformed chunk footer encountered (byte 1)");
157                     state = State.FOOTER_THREE;
158                     break;
159                 }
160                 case FOOTER_THREE: {
161                     final byte b = in.readByte();
162                     // In this state, either header-of-new-chunk or message-end is expected
163                     // Depends on the next character
164                     extractNewChunkOrMessageEnd(b);
165                     break;
166                 }
167                 case FOOTER_FOUR: {
168                     final byte b = in.readByte();
169                     checkNewLine(b,"Malformed chunk footer encountered (byte 3)");
170                     state = State.HEADER_ONE;
171                     out.add(chunk);
172                     chunk = null;
173                     break;
174                 }
175                 default:
176                     LOG.info("Unknown state.");
177             }
178         }
179
180         in.discardReadBytes();
181     }
182
183     private void extractNewChunkOrMessageEnd(final byte byteToCheck) {
184         if (isHeaderLengthFirst(byteToCheck)) {
185             // Extract header length#1 from new chunk
186             chunkSize = processHeaderLengthFirst(byteToCheck);
187             // Proceed with next chunk processing
188             state = State.HEADER_LENGTH_OTHER;
189         } else if (byteToCheck == '#') {
190             state = State.FOOTER_FOUR;
191         } else {
192             LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM_PARAM, byteToCheck, (byte) '#', (byte) '1', (byte) '9');
193             throw new IllegalStateException("Malformed chunk footer encountered (byte 2)");
194         }
195     }
196
197     private void initChunk() {
198         chunk = Unpooled.compositeBuffer();
199     }
200
201     private void aggregateChunks(final ByteBuf newChunk) {
202         chunk.addComponent(chunk.numComponents(), newChunk);
203
204         // Update writer index, addComponent does not update it
205         chunk.writerIndex(chunk.writerIndex() + newChunk.readableBytes());
206     }
207
208     private static int processHeaderLengthFirst(final byte byteToCheck) {
209         if (!isHeaderLengthFirst(byteToCheck)) {
210             LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, byteToCheck, (byte)'1', (byte)'9');
211             throw new IllegalStateException("Invalid chunk size encountered (byte 0)");
212         }
213
214         return byteToCheck - '0';
215     }
216
217     private static boolean isHeaderLengthFirst(final byte byteToCheck) {
218         return byteToCheck >= '1' && byteToCheck <= '9';
219     }
220 }