package org.opendaylight.openflowplugin.openflow.md.core.plan;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.Stack;
-import java.util.concurrent.Callable;
import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.sal.common.util.RpcErrors;
import org.opendaylight.controller.sal.common.util.Rpcs;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEventBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
import org.opendaylight.yangtools.yang.binding.Notification;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.SettableFuture;
protected SystemNotificationsListener systemListener;
protected Map<Long, SettableFuture<?>> rpcResults = new HashMap<>();
- private ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(
- 8);
protected boolean planTouched = false;
private long proceedTimeout;
private ConnectionReadyListener connectionReadyListener;
+ private int planItemCounter;
+
+ private boolean autoRead = true;
+
/**
* default ctor
*/
}
@Override
- public Future<RpcResult<BarrierOutput>> barrier(BarrierInput arg0) {
- checkRpc(arg0, "barrier");
+ public synchronized Future<RpcResult<BarrierOutput>> barrier(
+ BarrierInput arg0) {
+ checkRpcAndNext(arg0, "barrier");
SettableFuture<RpcResult<BarrierOutput>> result = createAndRegisterRpcResult(arg0);
- next();
return result;
}
@Override
- public Future<RpcResult<EchoOutput>> echo(EchoInput arg0) {
- checkRpc(arg0, "echo");
+ public synchronized Future<RpcResult<EchoOutput>> echo(EchoInput arg0) {
+ checkRpcAndNext(arg0, "echo");
Future<RpcResult<EchoOutput>> result = createAndRegisterRpcResult(arg0);
- next();
return result;
}
@Override
public Future<RpcResult<Void>> echoReply(EchoReplyInput arg0) {
- checkRpc(arg0, "echoReply");
+ checkRpcAndNext(arg0, "echoReply");
SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
- next();
return result;
}
@Override
public Future<RpcResult<Void>> experimenter(ExperimenterInput arg0) {
- checkRpc(arg0, "experimenter");
+ checkRpcAndNext(arg0, "experimenter");
SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
- next();
return result;
}
@Override
public Future<RpcResult<Void>> flowMod(FlowModInput arg0) {
- checkRpc(arg0, "flowMod");
+ checkRpcAndNext(arg0, "flowMod");
SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
- next();
return result;
}
@Override
- public Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput arg0) {
- checkRpc(arg0, "echo");
+ public synchronized Future<RpcResult<GetAsyncOutput>> getAsync(
+ GetAsyncInput arg0) {
+ checkRpcAndNext(arg0, "echo");
Future<RpcResult<GetAsyncOutput>> result = createAndRegisterRpcResult(arg0);
- next();
return result;
}
@Override
- public Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput arg0) {
- checkRpc(arg0, "echo");
+ public synchronized Future<RpcResult<GetConfigOutput>> getConfig(
+ GetConfigInput arg0) {
+ checkRpcAndNext(arg0, "echo");
Future<RpcResult<GetConfigOutput>> result = createAndRegisterRpcResult(arg0);
- next();
return result;
}
@Override
- public Future<RpcResult<GetFeaturesOutput>> getFeatures(
+ public synchronized Future<RpcResult<GetFeaturesOutput>> getFeatures(
GetFeaturesInput arg0) {
- checkRpc(arg0, "getFeatures");
+ checkRpcAndNext(arg0, "getFeatures");
Future<RpcResult<GetFeaturesOutput>> result = createAndRegisterRpcResult(arg0);
- next();
return result;
}
@Override
- public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
+ public synchronized Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
GetQueueConfigInput arg0) {
- checkRpc(arg0, "echo");
+ checkRpcAndNext(arg0, "echo");
Future<RpcResult<GetQueueConfigOutput>> result = createAndRegisterRpcResult(arg0);
- next();
return result;
}
@Override
public Future<RpcResult<Void>> groupMod(GroupModInput arg0) {
- checkRpc(arg0, "groupMod");
+ checkRpcAndNext(arg0, "groupMod");
SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
- next();
return result;
}
@Override
public Future<RpcResult<Void>> hello(HelloInput arg0) {
- checkRpc(arg0, "helloReply");
+ checkRpcAndNext(arg0, "helloReply");
SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
- next();
return result;
}
@Override
public Future<RpcResult<Void>> meterMod(MeterModInput arg0) {
- checkRpc(arg0, "meterMod");
+ checkRpcAndNext(arg0, "meterMod");
SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
- next();
return result;
}
@Override
public Future<RpcResult<Void>> packetOut(PacketOutInput arg0) {
- checkRpc(arg0, "packetOut");
+ checkRpcAndNext(arg0, "packetOut");
SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
- next();
return result;
}
@Override
public Future<RpcResult<Void>> portMod(PortModInput arg0) {
- checkRpc(arg0, "portMod");
+ checkRpcAndNext(arg0, "portMod");
SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
- next();
return result;
}
@Override
- public Future<RpcResult<RoleRequestOutput>> roleRequest(
+ public synchronized Future<RpcResult<RoleRequestOutput>> roleRequest(
RoleRequestInput arg0) {
- checkRpc(arg0, "echo");
+ checkRpcAndNext(arg0, "echo");
Future<RpcResult<RoleRequestOutput>> result = createAndRegisterRpcResult(arg0);
- next();
return result;
}
@Override
public Future<RpcResult<Void>> setAsync(SetAsyncInput arg0) {
- checkRpc(arg0, "setAsync");
+ checkRpcAndNext(arg0, "setAsync");
SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
- next();
return result;
}
@Override
public Future<RpcResult<Void>> setConfig(SetConfigInput arg0) {
- checkRpc(arg0, "setConfig");
+ checkRpcAndNext(arg0, "setConfig");
SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
- next();
return result;
}
@Override
public Future<RpcResult<Void>> tableMod(TableModInput arg0) {
- checkRpc(arg0, "tableMod");
+ checkRpcAndNext(arg0, "tableMod");
SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
- next();
return result;
}
@Override
public Future<Boolean> disconnect() {
- // TODO Auto-generated method stub
+ LOG.debug("adapter is told to disconnect");
+ DisconnectEventBuilder disconnectEventBuilder = new DisconnectEventBuilder();
+ disconnectEventBuilder.setInfo("disconnected by plugin");
+ systemListener.onDisconnectEvent(disconnectEventBuilder.build());
return null;
}
@Override
public void checkListeners() {
- if (ofListener == null || systemListener == null || connectionReadyListener == null) {
+ if (ofListener == null || systemListener == null
+ || connectionReadyListener == null) {
occuredExceptions
.add(new IllegalStateException("missing listeners"));
}
* @param rpcName
* rpc yang name
*/
- private synchronized void checkRpc(OfHeader rpcInput, String rpcName) {
+ private boolean checkRpc(OfHeader rpcInput, String rpcName) {
String msg = null;
- LOG.debug("checking rpc: " + rpcName);
- if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent)) {
- msg = "expected [rpc], got [" + rpcInput.getClass().getSimpleName()
- + "]";
+ boolean finished = true;
+
+ if (eventPlan.isEmpty()) {
+ throw new IllegalStateException("eventPlan already depleted");
+ }
+
+ LOG.debug("checking rpc: name={}, ver={}, xid={}", rpcName,
+ rpcInput.getVersion(), rpcInput.getXid());
+ if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent)
+ && !(eventPlan.peek() instanceof SwitchTestWaitForAllEvent)) {
+ if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
+ SwitchTestNotificationEvent notifEvent = (SwitchTestNotificationEvent) (eventPlan
+ .peek());
+ msg = "expected [notification: "
+ + notifEvent.getPlannedNotification() + "], got ["
+ + rpcInput.getClass().getSimpleName() + "]";
+ } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
+ SwitchTestRcpResponseEvent rpcEvent = (SwitchTestRcpResponseEvent) (eventPlan
+ .peek());
+ msg = "expected [rpc: " + rpcEvent.getPlannedRpcResponse()
+ + "], got [" + rpcInput.getClass().getSimpleName()
+ + "]";
+ }
} else {
- SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan
- .peek();
- if (!rpcName.equals(switchTestRpcEvent.getRpcName())) {
- msg = "expected rpc name [" + switchTestRpcEvent.getRpcName()
- + "], got [" + rpcName + "]";
- } else if (!rpcInput.getXid().equals(switchTestRpcEvent.getXid())) {
- msg = "expected xid [" + switchTestRpcEvent.getXid()
- + "], got [" + rpcInput.getXid() + "]";
+ if (eventPlan.peek() instanceof SwitchTestWaitForAllEvent) {
+ SwitchTestWaitForAllEvent switchTestWaitForAll = (SwitchTestWaitForAllEvent) eventPlan
+ .peek();
+ Set<SwitchTestWaitForRpcEvent> eventBag = switchTestWaitForAll
+ .getWaitEventBag();
+ List<String> msgLot = new ArrayList<>();
+
+ if (eventBag == null || eventBag.isEmpty()) {
+ msg = "no wait events in bag";
+ } else {
+ finished = false;
+ for (SwitchTestWaitForRpcEvent switchTestWaitForRpc : eventBag) {
+ String msgPart = checkSingleRpcContent(rpcInput,
+ rpcName, switchTestWaitForRpc);
+
+ if (msgPart != null) {
+ msgLot.add(msgPart);
+ } else {
+ LOG.debug("wait event matched: {}", rpcName);
+ eventBag.remove(switchTestWaitForRpc);
+ if (eventBag.isEmpty()) {
+ finished = true;
+ }
+ msgLot.clear();
+ break;
+ }
+ }
+ }
+
+ if (!msgLot.isEmpty()) {
+ msg = Joiner.on(" | ").join(msgLot);
+ }
+ } else if (eventPlan.peek() instanceof SwitchTestWaitForRpcEvent) {
+ SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan
+ .peek();
+ msg = checkSingleRpcContent(rpcInput, rpcName,
+ switchTestRpcEvent);
}
}
if (msg != null) {
- LOG.debug("check .. FAILED: " + msg);
- occuredExceptions.add(new IllegalArgumentException(msg));
+ LOG.debug("rpc check .. FAILED: " + msg);
+ occuredExceptions.add(new IllegalArgumentException("step:"
+ + planItemCounter + " | " + msg));
+ } else {
+ LOG.debug("rpc check .. OK");
+ }
+ return finished;
+ }
+
+ /**
+ * @param rpcInput
+ * @param rpcName
+ * @param msgTmp
+ * @param switchTestWaitForRpc
+ * @return
+ */
+ private static String checkSingleRpcContent(OfHeader rpcInput,
+ String rpcName, SwitchTestWaitForRpcEvent switchTestWaitForRpc) {
+ String failureMsg = null;
+ if (!rpcName.equals(switchTestWaitForRpc.getRpcName())) {
+ failureMsg = "expected rpc name ["
+ + switchTestWaitForRpc.getRpcName() + "], got [" + rpcName
+ + "]";
+ } else if (!rpcInput.getXid().equals(switchTestWaitForRpc.getXid())) {
+ failureMsg = "expected " + rpcName + ".xid ["
+ + switchTestWaitForRpc.getXid() + "], got ["
+ + rpcInput.getXid() + "]";
+ }
+
+ return failureMsg;
+ }
+
+ /**
+ * @param rpcInput
+ * rpc call parameter
+ * @param rpcName
+ * rpc yang name
+ */
+ private synchronized void checkRpcAndNext(OfHeader rpcInput, String rpcName) {
+ boolean finished = checkRpc(rpcInput, rpcName);
+ if (finished) {
+ next();
}
- LOG.debug("check .. OK");
}
/**
* discard current event, execute next, if possible
*/
- private synchronized void next() {
- LOG.debug("STEPPING TO NEXT event in plan");
+ private void next() {
+ LOG.debug("<---> STEPPING TO NEXT event in plan (leaving [{}] {})",
+ planItemCounter, eventPlan.peek());
eventPlan.pop();
+ planItemCounter++;
planTouched = true;
+ try {
+ Thread.sleep(JOB_DELAY);
+ } catch (InterruptedException e) {
+ LOG.error(e.getMessage(), e);
+ }
notify();
}
*/
private synchronized void proceed() {
boolean processed = false;
- LOG.debug("proceeding plan item: " + eventPlan.peek());
+ LOG.debug("proceeding plan item[{}]: {}", planItemCounter,
+ eventPlan.peek());
if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
SwitchTestNotificationEvent notification = (SwitchTestNotificationEvent) eventPlan
.peek();
.peek();
processRpcResponse(rpcResponse);
processed = true;
+ } else if (eventPlan.peek() instanceof SwitchTestCallbackEvent) {
+ SwitchTestCallbackEvent callbackEvent = (SwitchTestCallbackEvent) eventPlan
+ .peek();
+ try {
+ callbackEvent.getCallback().call();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ occuredExceptions.add(e);
+ }
+ processed = true;
}
if (processed) {
next();
} else {
try {
- LOG.debug("now waiting for HANDLER to act");
+ LOG.debug("now WAITING for OF_LISTENER to act ..");
wait(proceedTimeout);
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
@Override
public void run() {
- LOG.debug("evenPlan STARTING ..");
+ LOG.debug("|---> evenPlan STARTING ..");
+ planItemCounter = 0;
while (!eventPlan.isEmpty()) {
planTouched = false;
proceed();
if (!planTouched) {
- occuredExceptions.add(new IllegalStateException(
- "eventPlan STALLED"));
+ occuredExceptions
+ .add(new IllegalStateException(
+ "eventPlan STALLED, planItemCounter="
+ + planItemCounter));
+ break;
}
}
try {
- pool.awaitTermination(10 * JOB_DELAY, TimeUnit.MILLISECONDS);
+ Thread.sleep(JOB_DELAY);
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
}
- LOG.debug("eventPlan done");
+ LOG.debug("<---| eventPlan DONE");
}
/**
* @param notificationEvent
*/
- private void processNotification(
+ private synchronized void processNotification(
final SwitchTestNotificationEvent notificationEvent) {
- Callable<Void> notifyCmd = new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- Notification notification = notificationEvent
- .getPlannedNotification();
- LOG.debug("notificating HANDLER: "
- + notification.getClass().getSimpleName());
-
- // system events
- if (notification instanceof DisconnectEvent) {
- systemListener
- .onDisconnectEvent((DisconnectEvent) notification);
- }
- // of notifications
- else if (notification instanceof EchoRequestMessage) {
- ofListener
- .onEchoRequestMessage((EchoRequestMessage) notification);
- } else if (notification instanceof ErrorMessage) {
- ofListener.onErrorMessage((ErrorMessage) notification);
- } else if (notification instanceof ExperimenterMessage) {
- ofListener
- .onExperimenterMessage((ExperimenterMessage) notification);
- } else if (notification instanceof FlowRemovedMessage) {
- ofListener
- .onFlowRemovedMessage((FlowRemovedMessage) notification);
- } else if (notification instanceof HelloMessage) {
- ofListener.onHelloMessage((HelloMessage) notification);
- } else if (notification instanceof MultipartReplyMessage) {
- ofListener
- .onMultipartReplyMessage((MultipartReplyMessage) notification);
- } else if (notification instanceof MultipartRequestMessage) {
- ofListener
- .onMultipartRequestMessage((MultipartRequestMessage) notification);
- } else if (notification instanceof PacketInMessage) {
- ofListener
- .onPacketInMessage((PacketInMessage) notification);
- } else if (notification instanceof PortStatusMessage) {
- ofListener
- .onPortStatusMessage((PortStatusMessage) notification);
- }
- // default
- else {
- occuredExceptions.add(new IllegalStateException(
- "message listening not supported for type: "
- + notification.getClass()));
- }
-
- LOG.debug("thread finished");
- return null;
- }
+ Notification notification = notificationEvent.getPlannedNotification();
+ LOG.debug("notificating OF_LISTENER: "
+ + notification.getClass().getSimpleName());
- };
+ // system events
+ if (notification instanceof DisconnectEvent) {
+ systemListener.onDisconnectEvent((DisconnectEvent) notification);
+ }
+ // of notifications
+ else if (notification instanceof EchoRequestMessage) {
+ ofListener.onEchoRequestMessage((EchoRequestMessage) notification);
+ } else if (notification instanceof ErrorMessage) {
+ ofListener.onErrorMessage((ErrorMessage) notification);
+ } else if (notification instanceof ExperimenterMessage) {
+ ofListener
+ .onExperimenterMessage((ExperimenterMessage) notification);
+ } else if (notification instanceof FlowRemovedMessage) {
+ ofListener.onFlowRemovedMessage((FlowRemovedMessage) notification);
+ } else if (notification instanceof HelloMessage) {
+ ofListener.onHelloMessage((HelloMessage) notification);
+ } else if (notification instanceof MultipartReplyMessage) {
+ ofListener
+ .onMultipartReplyMessage((MultipartReplyMessage) notification);
+ } else if (notification instanceof PacketInMessage) {
+ ofListener.onPacketInMessage((PacketInMessage) notification);
+ } else if (notification instanceof PortStatusMessage) {
+ ofListener.onPortStatusMessage((PortStatusMessage) notification);
+ }
+ // default
+ else {
+ occuredExceptions.add(new IllegalStateException("step:"
+ + planItemCounter + " | "
+ + "message listening not supported for type: "
+ + notification.getClass()));
+ }
- pool.schedule(notifyCmd, JOB_DELAY, TimeUnit.MILLISECONDS);
+ LOG.debug("notification [" + notification.getClass().getSimpleName()
+ + "] .. done");
}
/**
* @param rpcResponse
*/
- private void processRpcResponse(final SwitchTestRcpResponseEvent rpcResponse) {
- Callable<Void> notifyCmd = new Callable<Void>() {
- @Override
- public Void call() throws Exception {
-
- OfHeader plannedRpcResponseValue = rpcResponse
- .getPlannedRpcResponse();
- LOG.debug("rpc-responding to HANDLER: " + rpcResponse.getXid());
-
- @SuppressWarnings("unchecked")
- SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
- .get(rpcResponse.getXid());
-
- if (response != null) {
- boolean successful = plannedRpcResponseValue != null;
- Collection<RpcError> errors;
- if (successful) {
- errors = Collections.emptyList();
- } else {
- errors = Lists
- .newArrayList(RpcErrors
- .getRpcError(
- "unit",
- "unit",
- "not requested",
- ErrorSeverity.ERROR,
- "planned response to RPC.id = "
- + rpcResponse.getXid(),
- ErrorType.RPC,
- new Exception(
- "rpc response failed (planned behavior)")));
- }
- RpcResult<?> result = Rpcs.getRpcResult(successful,
- plannedRpcResponseValue, errors);
- response.set(result);
- } else {
- String msg = "RpcResponse not expected: xid="
- + rpcResponse.getXid()
- + ", "
- + plannedRpcResponseValue.getClass()
- .getSimpleName();
- LOG.error(msg);
- occuredExceptions.add(new IllegalStateException(msg));
- }
-
- LOG.debug("thread finished");
- return null;
+ private synchronized void processRpcResponse(
+ final SwitchTestRcpResponseEvent rpcResponse) {
+ OfHeader plannedRpcResponseValue = rpcResponse.getPlannedRpcResponse();
+ LOG.debug("rpc-responding to OF_LISTENER: " + rpcResponse.getXid());
+
+ @SuppressWarnings("unchecked")
+ SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
+ .get(rpcResponse.getXid());
+
+ if (response != null) {
+ boolean successful = plannedRpcResponseValue != null;
+ Collection<RpcError> errors;
+ if (successful) {
+ errors = Collections.emptyList();
+ } else {
+ errors = Lists.newArrayList(RpcErrors.getRpcError("unit",
+ "unit", "not requested", ErrorSeverity.ERROR,
+ "planned response to RPC.id = " + rpcResponse.getXid(),
+ ErrorType.RPC, new Exception(
+ "rpc response failed (planned behavior)")));
}
- };
+ RpcResult<?> result = Rpcs.getRpcResult(successful,
+ plannedRpcResponseValue, errors);
+ response.set(result);
+ } else {
+ String msg = "RpcResponse not expected: xid="
+ + rpcResponse.getXid() + ", "
+ + plannedRpcResponseValue.getClass().getSimpleName();
+ LOG.error(msg);
+ occuredExceptions.add(new IllegalStateException("step:"
+ + planItemCounter + " | " + msg));
+ }
- pool.schedule(notifyCmd, JOB_DELAY, TimeUnit.MILLISECONDS);
+ LOG.debug("rpc [" + rpcResponse.getXid() + "] .. done");
}
/**
/**
* @return rpc future result
*/
- private static SettableFuture<RpcResult<Void>> createOneWayRpcResult() {
+ private synchronized SettableFuture<RpcResult<Void>> createOneWayRpcResult() {
SettableFuture<RpcResult<Void>> result = SettableFuture.create();
- result.set(null);
+ List<RpcError> errors = Collections.emptyList();
+ result.set(Rpcs.getRpcResult(true, (Void) null, errors));
return result;
}
@Override
public void fireConnectionReadyNotification() {
- if (connectionReadyListener != null) {
- connectionReadyListener.onConnectionReady();
- }
+ connectionReadyListener.onConnectionReady();
}
@Override
ConnectionReadyListener connectionReadyListener) {
this.connectionReadyListener = connectionReadyListener;
}
+
+ @Override
+ public Future<RpcResult<Void>> multipartRequest(MultipartRequestInput arg0) {
+ checkRpcAndNext(arg0, "multipartRequestInput");
+ SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ return result;
+ }
+
+ @Override
+ public InetSocketAddress getRemoteAddress() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean isAutoRead() {
+ return autoRead;
+ }
+
+ @Override
+ public void setAutoRead(boolean autoRead) {
+ this.autoRead = autoRead;
+ }
+
}