this.msgType2 = msgType2;\r
}\r
\r
+ @Override\r
+ public int hashCode() {\r
+ final int prime = 31;\r
+ int result = super.hashCode();\r
+ result = prime * result + ((msgType2 == null) ? 0 : msgType2.hashCode());\r
+ return result;\r
+ }\r
+ \r
@Override\r
public boolean equals(Object obj) {\r
if (this == obj) {\r
public String toString() {\r
return super.toString() + " msgType2: " + msgType2.getName();\r
}\r
-}
\ No newline at end of file
+}
private final Queue<MessageHolder<?>> queue;
private final long maxWorkTime;
private final Channel channel;
- private InetSocketAddress address;
+ private final InetSocketAddress address;
- public ChannelOutboundQueue(final Channel channel, final int queueDepth) {
+ public ChannelOutboundQueue(final Channel channel, final int queueDepth, final InetSocketAddress address) {
Preconditions.checkArgument(queueDepth > 0, "Queue depth has to be positive");
/*
this.queue = new LinkedBlockingQueue<>(queueDepth);
this.channel = Preconditions.checkNotNull(channel);
this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
+ this.address = address;
}
/**
public String toString() {
return String.format("Channel %s queue [%s messages flushing=%s]", channel, queue.size(), flushScheduled);
}
-
- public void setAddress(InetSocketAddress address) {
- this.address = address;
- }
}
.expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES)
.removalListener(REMOVAL_LISTENER).build();
this.channel = Preconditions.checkNotNull(channel);
- this.output = new ChannelOutboundQueue(channel, DEFAULT_QUEUE_DEPTH);
- output.setAddress(address);
+ this.output = new ChannelOutboundQueue(channel, DEFAULT_QUEUE_DEPTH, address);
channel.pipeline().addLast(output);
LOG.debug("ConnectionAdapter created");
}
*/\r
@Test(expected=IllegalArgumentException.class)\r
public void testIncorrectQueueCreation() {\r
- new ChannelOutboundQueue(channel, 0);\r
+ new ChannelOutboundQueue(channel, 0, null);\r
}\r
\r
/**\r
*/\r
@Test\r
public void testEnqueue() {\r
- ChannelOutboundQueue queue = new ChannelOutboundQueue(channel, 1);\r
+ ChannelOutboundQueue queue = new ChannelOutboundQueue(channel, 1, null);\r
boolean enqueued = queue.enqueue(new SimpleRpcListener("INPUT", "Failed to send INPUT"));\r
Assert.assertTrue("Enqueue problem", enqueued);\r
enqueued = queue.enqueue(new SimpleRpcListener("INPUT", "Failed to send INPUT"));\r