2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.protocol.pcep.impl;
10 import java.io.IOException;
11 import java.io.PipedInputStream;
13 import org.opendaylight.protocol.framework.DeserializerException;
14 import org.opendaylight.protocol.framework.DocumentedException;
15 import org.opendaylight.protocol.framework.ProtocolInputStream;
16 import org.opendaylight.protocol.framework.ProtocolInputStreamFactory;
17 import org.opendaylight.protocol.framework.ProtocolMessage;
18 import org.opendaylight.protocol.framework.ProtocolMessageFactory;
23 public class PCEPInputStream implements ProtocolInputStream {
24 static final ProtocolInputStreamFactory FACTORY = new ProtocolInputStreamFactory() {
26 public ProtocolInputStream getProtocolInputStream(final PipedInputStream pis, final ProtocolMessageFactory pmf) {
27 return new PCEPInputStream(pis, pmf);
31 private final ProtocolMessageFactory factory;
32 private final PipedInputStream inputStream;
33 private PCEPMessageHeader header;
35 public PCEPInputStream(final PipedInputStream inputStream, final ProtocolMessageFactory factory) {
36 this.factory = factory;
37 this.inputStream = inputStream;
38 this.header = new PCEPMessageHeader();
42 * Check availability of a message in underlying input stream. A message is available when there are more or the
43 * same amount of bytes in the stream as the message length is specified in message header. If there are not enough
44 * bytes for the message or even to read a message header, return false.
46 * @return true if there are enough bytes to read a message false if there are not enough bytes to read a message or
51 public synchronized boolean isMessageAvailable() throws IOException {
52 if (!this.header.isParsed()) {
53 // No header, try to parse it
54 this.header = this.parseHeaderIfAvailable();
56 if (!this.header.isParsed()) {
57 // No luck, we do not have a message
61 // message length contains the size of the header too
62 if (this.inputStream.available() < (this.header.getLength() - PCEPMessageHeader.COMMON_HEADER_LENGTH)) {
69 * If there are enough bytes in the underlying stream, parse the message. Blocking, till there are enough bytes to
70 * read, therefore the call of method isMessageAvailable() is suggested first.
72 * @param factory protocol specific message factory
73 * @return protocol specific message
74 * @throws DeserializerException
76 * @throws DocumentedException
79 public synchronized ProtocolMessage getMessage() throws DeserializerException, IOException, DocumentedException {
80 // isMessageAvailable wasn't called, or there were not enough bytes to form message header
81 // blocking till the header is available
82 while (!this.header.isParsed()) {
83 this.header = this.parseHeaderIfAvailable();
85 final byte[] bytes = new byte[this.header.getLength() - PCEPMessageHeader.COMMON_HEADER_LENGTH]; // message
92 // blocking till the whole message is available
93 if (this.inputStream.read(bytes) == -1) {
94 throw new IOException("PipedInputStream was closed, before data could be read from it.");
96 final ProtocolMessage msg = this.factory.parse(bytes, this.header);
98 this.header.setParsed(); // if we have all the bytes to send the message for parsing, clear the header, to let know,
99 // we're ready to read another message
105 * Checks if there are enough bytes to parse a header and parses it. Non-blocking: if there are not enough bytes to
106 * parse a message header, returns false.
108 * @return cleared header if no header is available
109 * @return header object when enough data is available
111 private PCEPMessageHeader parseHeaderIfAvailable() throws IOException {
112 if (this.inputStream.available() < PCEPMessageHeader.COMMON_HEADER_LENGTH) {
113 this.header.setParsed();
117 final byte[] messageHeader = new byte[PCEPMessageHeader.COMMON_HEADER_LENGTH];
118 if (this.inputStream.read(messageHeader) == -1) {
119 this.header.setParsed();
122 return this.header.fromBytes(messageHeader);