/* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.netconf.nettyutil.handler.ssh.virtualsocket; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandler; import java.io.IOException; import java.io.InputStream; /** * Class provides {@link InputStream} functionality to users of virtual socket. */ public class ChannelInputStream extends InputStream implements ChannelInboundHandler { private final Object lock = new Object(); private final ByteBuf bb = Unpooled.buffer(); @Override public int read(byte b[], int off, int len) throws IOException { if (b == null) { throw new NullPointerException(); } else if (off < 0 || len < 0 || len > b.length - off) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } int bytesRead = 1; synchronized (lock) { int c = read(); b[off] = (byte)c; if(this.bb.readableBytes() == 0) { return bytesRead; } int ltr = len-1; ltr = (ltr <= bb.readableBytes()) ? ltr : bb.readableBytes(); bb.readBytes(b, 1, ltr); bytesRead += ltr; } return bytesRead; } @Override public int read() throws IOException { synchronized (lock) { while (this.bb.readableBytes() == 0) { try { lock.wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(e); } } return this.bb.readByte() & 0xFF; } } @Override public int available() throws IOException { synchronized (lock) { return this.bb.readableBytes(); } } public void channelRegistered(ChannelHandlerContext ctx) { ctx.fireChannelRegistered(); } public void channelUnregistered(ChannelHandlerContext ctx) { ctx.fireChannelUnregistered(); } public void channelActive(ChannelHandlerContext ctx) { ctx.fireChannelActive(); } public void channelInactive(ChannelHandlerContext ctx) { ctx.fireChannelInactive(); } public void channelRead(ChannelHandlerContext ctx, Object o) { synchronized(lock) { this.bb.discardReadBytes(); this.bb.writeBytes((ByteBuf) o); lock.notifyAll(); } } public void channelReadComplete(ChannelHandlerContext ctx) { ctx.fireChannelReadComplete(); } public void userEventTriggered(ChannelHandlerContext ctx, Object o) { ctx.fireUserEventTriggered(o); } public void channelWritabilityChanged(ChannelHandlerContext ctx) { ctx.fireChannelWritabilityChanged(); } public void handlerAdded(ChannelHandlerContext ctx) { } public void handlerRemoved(ChannelHandlerContext ctx) { } public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) { ctx.fireExceptionCaught(throwable); } }