2 * Copyright (c) 2014 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10 import io.netty.channel.ChannelHandler;
11 import io.netty.channel.ChannelHandlerContext;
12 import io.netty.channel.ChannelOutboundHandlerAdapter;
13 import io.netty.channel.ChannelPromise;
14 import io.netty.channel.embedded.EmbeddedChannel;
16 import java.net.InetSocketAddress;
17 import java.util.concurrent.TimeUnit;
19 import junit.framework.Assert;
21 import org.junit.After;
22 import org.junit.Before;
23 import org.junit.Test;
24 import org.mockito.Mock;
25 import org.mockito.MockitoAnnotations;
26 import org.opendaylight.openflowjava.protocol.impl.core.connection.ChannelOutboundQueue;
27 import org.opendaylight.openflowjava.protocol.impl.core.connection.ConnectionAdapterImpl;
28 import org.opendaylight.openflowjava.protocol.impl.core.connection.MessageListenerWrapper;
29 import org.opendaylight.openflowjava.protocol.impl.core.connection.ResponseExpectedRpcListener;
30 import org.opendaylight.openflowjava.protocol.impl.core.connection.RpcResponseKey;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
36 import com.google.common.cache.Cache;
37 import com.google.common.cache.CacheBuilder;
38 import com.google.common.cache.RemovalListener;
39 import com.google.common.cache.RemovalNotification;
45 public class ChannelOutboundQueue02Test {
46 private static int counter;
47 private static final int RPC_RESPONSE_EXPIRATION = 1;
48 private static final RemovalListener<RpcResponseKey, ResponseExpectedRpcListener<?>> REMOVAL_LISTENER =
49 new RemovalListener<RpcResponseKey, ResponseExpectedRpcListener<?>>() {
51 public void onRemoval(
52 final RemovalNotification<RpcResponseKey, ResponseExpectedRpcListener<?>> notification) {
53 notification.getValue().discard();
57 @Mock EchoInput echoInput;
58 @Mock BarrierInput barrierInput;
59 @Mock EchoReplyInput echoReplyInput;
60 @Mock ExperimenterInput experimenterInput;
61 private ConnectionAdapterImpl adapter;
62 private Cache<RpcResponseKey, ResponseExpectedRpcListener<?>> cache;
68 MockitoAnnotations.initMocks(this);
71 * Disconnect adapter after each test
74 public void tierDown(){
75 if (adapter != null && adapter.isAlive()) {
81 * Test write to closed / opened channel
85 public void test01() throws Exception {
86 EmbeddedChannel ec = new EmbeddedChannel(new EmbededChannelHandler());
87 adapter = new ConnectionAdapterImpl(ec,InetSocketAddress.createUnresolved("localhost", 9876));
88 cache = CacheBuilder.newBuilder().concurrencyLevel(1).expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES)
89 .removalListener(REMOVAL_LISTENER).build();
90 adapter.setResponseCache(cache);
91 ChannelOutboundQueue cq = (ChannelOutboundQueue) ec.pipeline().last();
93 adapter.barrier(barrierInput);
94 adapter.echo(echoInput);
95 cq.channelInactive(ec.pipeline().lastContext());
97 Assert.assertEquals("Wrong - ChannelOutboundHandlerAdapter.write was invoked on closed channel",0, counter);
98 cq.channelActive(ec.pipeline().lastContext());
100 adapter.barrier(barrierInput);
101 adapter.experimenter(experimenterInput);
102 ec.runPendingTasks();
103 Assert.assertEquals("Wrong - ChannelOutboundHandlerAdapter.write has not been invoked on opened channel",2, counter);
107 * Test write to read only / writable channel
110 public void test02(){
111 ChangeWritableEmbededChannel ec = new ChangeWritableEmbededChannel(new EmbededChannelHandler());
112 adapter = new ConnectionAdapterImpl(ec,InetSocketAddress.createUnresolved("localhost", 9876));
113 cache = CacheBuilder.newBuilder().concurrencyLevel(1).expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES)
114 .removalListener(REMOVAL_LISTENER).build();
115 adapter.setResponseCache(cache);
118 adapter.barrier(barrierInput);
119 adapter.echo(echoInput);
120 ec.runPendingTasks();
121 Assert.assertEquals("Wrong - write to readonly channel",0, counter);
123 adapter.echoReply(echoReplyInput);
124 adapter.echo(echoInput);
125 ec.runPendingTasks();
126 Assert.assertEquals("Wrong - write to writtable channel",4, counter);
130 * Channel Handler for testing
134 private class EmbededChannelHandler extends ChannelOutboundHandlerAdapter {
136 public void write(ChannelHandlerContext ctx, Object msg,
137 ChannelPromise promise) throws Exception {
138 if(msg instanceof MessageListenerWrapper){
145 * Class for testing - channel can change state to read only or writable
149 private class ChangeWritableEmbededChannel extends EmbeddedChannel {
150 private boolean isWrittable;
151 public ChangeWritableEmbededChannel(ChannelHandler channelHandler){
152 super(channelHandler);
157 public boolean isWritable() {
161 public void setWritable(){
165 public void setReadOnly(){