import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.Stack;
import java.util.concurrent.Future;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEventBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
import org.opendaylight.yangtools.yang.binding.Notification;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.SettableFuture;
}
@Override
- public Future<RpcResult<BarrierOutput>> barrier(BarrierInput arg0) {
- checkRpc(arg0, "barrier");
+ public synchronized Future<RpcResult<BarrierOutput>> barrier(BarrierInput arg0) {
+ checkRpcAndNext(arg0, "barrier");
SettableFuture<RpcResult<BarrierOutput>> result = createAndRegisterRpcResult(arg0);
- next();
return result;
}
@Override
- public Future<RpcResult<EchoOutput>> echo(EchoInput arg0) {
- checkRpc(arg0, "echo");
+ public synchronized Future<RpcResult<EchoOutput>> echo(EchoInput arg0) {
+ checkRpcAndNext(arg0, "echo");
Future<RpcResult<EchoOutput>> result = createAndRegisterRpcResult(arg0);
- next();
return result;
}
@Override
public Future<RpcResult<Void>> echoReply(EchoReplyInput arg0) {
- checkRpc(arg0, "echoReply");
+ checkRpcAndNext(arg0, "echoReply");
SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
- next();
return result;
}
@Override
public Future<RpcResult<Void>> experimenter(ExperimenterInput arg0) {
- checkRpc(arg0, "experimenter");
+ checkRpcAndNext(arg0, "experimenter");
SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
- next();
return result;
}
@Override
public Future<RpcResult<Void>> flowMod(FlowModInput arg0) {
- checkRpc(arg0, "flowMod");
+ checkRpcAndNext(arg0, "flowMod");
SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
- next();
return result;
}
@Override
- public Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput arg0) {
- checkRpc(arg0, "echo");
+ public synchronized Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput arg0) {
+ checkRpcAndNext(arg0, "echo");
Future<RpcResult<GetAsyncOutput>> result = createAndRegisterRpcResult(arg0);
- next();
return result;
}
@Override
- public Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput arg0) {
- checkRpc(arg0, "echo");
+ public synchronized Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput arg0) {
+ checkRpcAndNext(arg0, "echo");
Future<RpcResult<GetConfigOutput>> result = createAndRegisterRpcResult(arg0);
- next();
return result;
}
@Override
- public Future<RpcResult<GetFeaturesOutput>> getFeatures(
+ public synchronized Future<RpcResult<GetFeaturesOutput>> getFeatures(
GetFeaturesInput arg0) {
- checkRpc(arg0, "getFeatures");
+ checkRpcAndNext(arg0, "getFeatures");
Future<RpcResult<GetFeaturesOutput>> result = createAndRegisterRpcResult(arg0);
- next();
return result;
}
@Override
- public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
+ public synchronized Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
GetQueueConfigInput arg0) {
- checkRpc(arg0, "echo");
+ checkRpcAndNext(arg0, "echo");
Future<RpcResult<GetQueueConfigOutput>> result = createAndRegisterRpcResult(arg0);
- next();
return result;
}
@Override
public Future<RpcResult<Void>> groupMod(GroupModInput arg0) {
- checkRpc(arg0, "groupMod");
+ checkRpcAndNext(arg0, "groupMod");
SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
- next();
return result;
}
@Override
public Future<RpcResult<Void>> hello(HelloInput arg0) {
- checkRpc(arg0, "helloReply");
+ checkRpcAndNext(arg0, "helloReply");
SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
- next();
return result;
}
@Override
public Future<RpcResult<Void>> meterMod(MeterModInput arg0) {
- checkRpc(arg0, "meterMod");
+ checkRpcAndNext(arg0, "meterMod");
SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
- next();
return result;
}
@Override
public Future<RpcResult<Void>> packetOut(PacketOutInput arg0) {
- checkRpc(arg0, "packetOut");
+ checkRpcAndNext(arg0, "packetOut");
SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
- next();
return result;
}
@Override
public Future<RpcResult<Void>> portMod(PortModInput arg0) {
- checkRpc(arg0, "portMod");
+ checkRpcAndNext(arg0, "portMod");
SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
- next();
return result;
}
@Override
- public Future<RpcResult<RoleRequestOutput>> roleRequest(
+ public synchronized Future<RpcResult<RoleRequestOutput>> roleRequest(
RoleRequestInput arg0) {
- checkRpc(arg0, "echo");
+ checkRpcAndNext(arg0, "echo");
Future<RpcResult<RoleRequestOutput>> result = createAndRegisterRpcResult(arg0);
- next();
return result;
}
@Override
public Future<RpcResult<Void>> setAsync(SetAsyncInput arg0) {
- checkRpc(arg0, "setAsync");
+ checkRpcAndNext(arg0, "setAsync");
SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
- next();
return result;
}
@Override
public Future<RpcResult<Void>> setConfig(SetConfigInput arg0) {
- checkRpc(arg0, "setConfig");
+ checkRpcAndNext(arg0, "setConfig");
SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
- next();
return result;
}
@Override
public Future<RpcResult<Void>> tableMod(TableModInput arg0) {
- checkRpc(arg0, "tableMod");
+ checkRpcAndNext(arg0, "tableMod");
SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
- next();
return result;
}
@Override
public Future<Boolean> disconnect() {
- // TODO Auto-generated method stub
+ LOG.info("adapter is told to disconnect");
+ DisconnectEventBuilder disconnectEventBuilder = new DisconnectEventBuilder();
+ disconnectEventBuilder.setInfo("disconnected by plugin");
+ systemListener.onDisconnectEvent(disconnectEventBuilder.build());
return null;
}
* @param rpcName
* rpc yang name
*/
- private synchronized void checkRpc(OfHeader rpcInput, String rpcName) {
+ private boolean checkRpc(OfHeader rpcInput, String rpcName) {
String msg = null;
- LOG.debug("checking rpc: " + rpcName);
- if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent)) {
+ boolean finished = true;
+
+ if (eventPlan.isEmpty()) {
+ throw new IllegalStateException("eventPlan already depleted");
+ }
+
+ LOG.debug("checking rpc: name={}, ver={}, xid={}", rpcName, rpcInput.getVersion(), rpcInput.getXid());
+ if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent)
+ && !(eventPlan.peek() instanceof SwitchTestWaitForAllEvent)) {
if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
SwitchTestNotificationEvent notifEvent = (SwitchTestNotificationEvent) (eventPlan.peek());
msg = "expected [notification: " +notifEvent.getPlannedNotification()+ "], got [" + rpcInput.getClass().getSimpleName()
+ "]";
}
} else {
- SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan
- .peek();
- if (!rpcName.equals(switchTestRpcEvent.getRpcName())) {
- msg = "expected rpc name [" + switchTestRpcEvent.getRpcName()
- + "], got [" + rpcName + "]";
- } else if (!rpcInput.getXid().equals(switchTestRpcEvent.getXid())) {
- msg = "expected xid [" + switchTestRpcEvent.getXid()
- + "], got [" + rpcInput.getXid() + "]";
+ if (eventPlan.peek() instanceof SwitchTestWaitForAllEvent) {
+ SwitchTestWaitForAllEvent switchTestWaitForAll = (SwitchTestWaitForAllEvent) eventPlan
+ .peek();
+ Set<SwitchTestWaitForRpcEvent> eventBag = switchTestWaitForAll.getWaitEventBag();
+ List<String> msgLot = new ArrayList<>();
+
+ if (eventBag == null || eventBag.isEmpty()) {
+ msg = "no wait events in bag";
+ } else {
+ finished = false;
+ for (SwitchTestWaitForRpcEvent switchTestWaitForRpc : eventBag) {
+ String msgPart = checkSingleRpcContent(rpcInput, rpcName, switchTestWaitForRpc);
+
+ if (msgPart != null) {
+ msgLot.add(msgPart);
+ } else {
+ LOG.debug("wait event matched: {}", rpcName);
+ eventBag.remove(switchTestWaitForRpc);
+ if (eventBag.isEmpty()) {
+ finished = true;
+ }
+ msgLot.clear();
+ break;
+ }
+ }
+ }
+
+ if (!msgLot.isEmpty()) {
+ msg = Joiner.on(" | ").join(msgLot);
+ }
+ } else if (eventPlan.peek() instanceof SwitchTestWaitForRpcEvent) {
+ SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan
+ .peek();
+ msg = checkSingleRpcContent(rpcInput, rpcName, switchTestRpcEvent);
}
}
if (msg != null) {
LOG.debug("rpc check .. FAILED: " + msg);
- occuredExceptions.add(new IllegalArgumentException(msg));
+ occuredExceptions.add(new IllegalArgumentException("step:"+planItemCounter+" | "+msg));
+ } else {
+ LOG.debug("rpc check .. OK");
+ }
+ return finished;
+ }
+
+ /**
+ * @param rpcInput
+ * @param rpcName
+ * @param msgTmp
+ * @param switchTestWaitForRpc
+ * @return
+ */
+ private static String checkSingleRpcContent(OfHeader rpcInput, String rpcName,
+ SwitchTestWaitForRpcEvent switchTestWaitForRpc) {
+ String failureMsg = null;
+ if (!rpcName.equals(switchTestWaitForRpc.getRpcName())) {
+ failureMsg = "expected rpc name [" + switchTestWaitForRpc.getRpcName()
+ + "], got [" + rpcName + "]";
+ } else if (!rpcInput.getXid().equals(switchTestWaitForRpc.getXid())) {
+ failureMsg = "expected "+rpcName+".xid [" + switchTestWaitForRpc.getXid()
+ + "], got [" + rpcInput.getXid() + "]";
+ }
+
+ return failureMsg;
+ }
+
+ /**
+ * @param rpcInput
+ * rpc call parameter
+ * @param rpcName
+ * rpc yang name
+ */
+ private synchronized void checkRpcAndNext(OfHeader rpcInput, String rpcName) {
+ boolean finished = checkRpc(rpcInput, rpcName);
+ if (finished) {
+ next();
}
- LOG.debug("rpc check .. OK");
}
/**
* discard current event, execute next, if possible
*/
- private synchronized void next() {
+ private void next() {
LOG.debug("<---> STEPPING TO NEXT event in plan (leaving [{}] {})", planItemCounter, eventPlan.peek());
eventPlan.pop();
planItemCounter ++;
planTouched = true;
+ try {
+ Thread.sleep(JOB_DELAY);
+ } catch (InterruptedException e) {
+ LOG.error(e.getMessage(), e);
+ }
notify();
}
.peek();
processRpcResponse(rpcResponse);
processed = true;
+ } else if (eventPlan.peek() instanceof SwitchTestCallbackEvent) {
+ SwitchTestCallbackEvent callbackEvent = (SwitchTestCallbackEvent) eventPlan.peek();
+ try {
+ callbackEvent.getCallback().call();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ occuredExceptions.add(e);
+ }
+ processed = true;
}
if (processed) {
proceed();
if (!planTouched) {
occuredExceptions.add(new IllegalStateException(
- "eventPlan STALLED"));
+ "eventPlan STALLED, planItemCounter="+planItemCounter));
+ break;
}
}
/**
* @param notificationEvent
*/
- private void processNotification(
+ private synchronized void processNotification(
final SwitchTestNotificationEvent notificationEvent) {
Notification notification = notificationEvent
}
// default
else {
- occuredExceptions.add(new IllegalStateException(
+ occuredExceptions.add(new IllegalStateException("step:"+planItemCounter+" | " +
"message listening not supported for type: "
+ notification.getClass()));
}
/**
* @param rpcResponse
*/
- private void processRpcResponse(final SwitchTestRcpResponseEvent rpcResponse) {
+ private synchronized void processRpcResponse(final SwitchTestRcpResponseEvent rpcResponse) {
OfHeader plannedRpcResponseValue = rpcResponse
.getPlannedRpcResponse();
LOG.debug("rpc-responding to OF_LISTENER: " + rpcResponse.getXid());
+ plannedRpcResponseValue.getClass()
.getSimpleName();
LOG.error(msg);
- occuredExceptions.add(new IllegalStateException(msg));
+ occuredExceptions.add(new IllegalStateException("step:"+planItemCounter+" | " + msg));
}
LOG.debug("rpc ["+rpcResponse.getXid()+"] .. done");
/**
* @return rpc future result
*/
- private static SettableFuture<RpcResult<Void>> createOneWayRpcResult() {
+ private synchronized SettableFuture<RpcResult<Void>> createOneWayRpcResult() {
SettableFuture<RpcResult<Void>> result = SettableFuture.create();
List<RpcError> errors = Collections.emptyList();
result.set(Rpcs.getRpcResult(true, (Void) null, errors));