2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.openflow.md.core.plan;
11 import java.net.InetSocketAddress;
12 import java.util.ArrayList;
13 import java.util.Collection;
14 import java.util.Collections;
15 import java.util.HashMap;
16 import java.util.List;
19 import java.util.Stack;
20 import java.util.concurrent.Future;
22 import org.opendaylight.controller.sal.common.util.RpcErrors;
23 import org.opendaylight.controller.sal.common.util.Rpcs;
24 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
25 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInput;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEventBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
65 import org.opendaylight.yangtools.yang.binding.Notification;
66 import org.opendaylight.yangtools.yang.common.RpcError;
67 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
68 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
69 import org.opendaylight.yangtools.yang.common.RpcResult;
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
73 import com.google.common.base.Joiner;
74 import com.google.common.collect.Lists;
75 import com.google.common.util.concurrent.SettableFuture;
80 public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
82 /** notify and rpc-response thread start default delay in [ms] */
83 public static final int JOB_DELAY = 50;
85 protected static final Logger LOG = LoggerFactory
86 .getLogger(ConnectionAdapterStackImpl.class);
88 protected Stack<? extends SwitchTestEvent> eventPlan;
89 protected OpenflowProtocolListener ofListener;
90 protected SystemNotificationsListener systemListener;
92 protected Map<Long, SettableFuture<?>> rpcResults = new HashMap<>();
93 protected boolean planTouched = false;
95 private long proceedTimeout;
97 protected List<Exception> occuredExceptions = new ArrayList<>();
99 private ConnectionReadyListener connectionReadyListener;
101 private int planItemCounter;
103 private boolean autoRead = true;
108 public ConnectionAdapterStackImpl() {
113 public synchronized Future<RpcResult<BarrierOutput>> barrier(
115 checkRpcAndNext(arg0, "barrier");
116 SettableFuture<RpcResult<BarrierOutput>> result = createAndRegisterRpcResult(arg0);
121 public synchronized Future<RpcResult<EchoOutput>> echo(EchoInput arg0) {
122 checkRpcAndNext(arg0, "echo");
123 Future<RpcResult<EchoOutput>> result = createAndRegisterRpcResult(arg0);
128 public Future<RpcResult<Void>> echoReply(EchoReplyInput arg0) {
129 checkRpcAndNext(arg0, "echoReply");
130 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
135 public Future<RpcResult<Void>> experimenter(ExperimenterInput arg0) {
136 checkRpcAndNext(arg0, "experimenter");
137 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
142 public Future<RpcResult<Void>> flowMod(FlowModInput arg0) {
143 checkRpcAndNext(arg0, "flowMod");
144 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
149 public synchronized Future<RpcResult<GetAsyncOutput>> getAsync(
150 GetAsyncInput arg0) {
151 checkRpcAndNext(arg0, "echo");
152 Future<RpcResult<GetAsyncOutput>> result = createAndRegisterRpcResult(arg0);
157 public synchronized Future<RpcResult<GetConfigOutput>> getConfig(
158 GetConfigInput arg0) {
159 checkRpcAndNext(arg0, "echo");
160 Future<RpcResult<GetConfigOutput>> result = createAndRegisterRpcResult(arg0);
165 public synchronized Future<RpcResult<GetFeaturesOutput>> getFeatures(
166 GetFeaturesInput arg0) {
167 checkRpcAndNext(arg0, "getFeatures");
168 Future<RpcResult<GetFeaturesOutput>> result = createAndRegisterRpcResult(arg0);
173 public synchronized Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
174 GetQueueConfigInput arg0) {
175 checkRpcAndNext(arg0, "echo");
176 Future<RpcResult<GetQueueConfigOutput>> result = createAndRegisterRpcResult(arg0);
181 public Future<RpcResult<Void>> groupMod(GroupModInput arg0) {
182 checkRpcAndNext(arg0, "groupMod");
183 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
188 public Future<RpcResult<Void>> hello(HelloInput arg0) {
189 checkRpcAndNext(arg0, "helloReply");
190 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
195 public Future<RpcResult<Void>> meterMod(MeterModInput arg0) {
196 checkRpcAndNext(arg0, "meterMod");
197 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
202 public Future<RpcResult<Void>> packetOut(PacketOutInput arg0) {
203 checkRpcAndNext(arg0, "packetOut");
204 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
209 public Future<RpcResult<Void>> portMod(PortModInput arg0) {
210 checkRpcAndNext(arg0, "portMod");
211 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
216 public synchronized Future<RpcResult<RoleRequestOutput>> roleRequest(
217 RoleRequestInput arg0) {
218 checkRpcAndNext(arg0, "echo");
219 Future<RpcResult<RoleRequestOutput>> result = createAndRegisterRpcResult(arg0);
224 public Future<RpcResult<Void>> setAsync(SetAsyncInput arg0) {
225 checkRpcAndNext(arg0, "setAsync");
226 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
231 public Future<RpcResult<Void>> setConfig(SetConfigInput arg0) {
232 checkRpcAndNext(arg0, "setConfig");
233 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
238 public Future<RpcResult<Void>> tableMod(TableModInput arg0) {
239 checkRpcAndNext(arg0, "tableMod");
240 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
245 public Future<Boolean> disconnect() {
246 LOG.debug("adapter is told to disconnect");
247 DisconnectEventBuilder disconnectEventBuilder = new DisconnectEventBuilder();
248 disconnectEventBuilder.setInfo("disconnected by plugin");
249 systemListener.onDisconnectEvent(disconnectEventBuilder.build());
254 public boolean isAlive() {
260 public void setMessageListener(OpenflowProtocolListener ofListener) {
261 this.ofListener = ofListener;
265 public void checkListeners() {
266 if (ofListener == null || systemListener == null
267 || connectionReadyListener == null) {
269 .add(new IllegalStateException("missing listeners"));
274 public void setSystemListener(SystemNotificationsListener systemListener) {
275 this.systemListener = systemListener;
284 private boolean checkRpc(OfHeader rpcInput, String rpcName) {
286 boolean finished = true;
288 if (eventPlan.isEmpty()) {
289 throw new IllegalStateException("eventPlan already depleted");
292 LOG.debug("checking rpc: name={}, ver={}, xid={}", rpcName,
293 rpcInput.getVersion(), rpcInput.getXid());
294 if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent)
295 && !(eventPlan.peek() instanceof SwitchTestWaitForAllEvent)) {
296 if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
297 SwitchTestNotificationEvent notifEvent = (SwitchTestNotificationEvent) (eventPlan
299 msg = "expected [notification: "
300 + notifEvent.getPlannedNotification() + "], got ["
301 + rpcInput.getClass().getSimpleName() + "]";
302 } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
303 SwitchTestRcpResponseEvent rpcEvent = (SwitchTestRcpResponseEvent) (eventPlan
305 msg = "expected [rpc: " + rpcEvent.getPlannedRpcResponse()
306 + "], got [" + rpcInput.getClass().getSimpleName()
310 if (eventPlan.peek() instanceof SwitchTestWaitForAllEvent) {
311 SwitchTestWaitForAllEvent switchTestWaitForAll = (SwitchTestWaitForAllEvent) eventPlan
313 Set<SwitchTestWaitForRpcEvent> eventBag = switchTestWaitForAll
315 List<String> msgLot = new ArrayList<>();
317 if (eventBag == null || eventBag.isEmpty()) {
318 msg = "no wait events in bag";
321 for (SwitchTestWaitForRpcEvent switchTestWaitForRpc : eventBag) {
322 String msgPart = checkSingleRpcContent(rpcInput,
323 rpcName, switchTestWaitForRpc);
325 if (msgPart != null) {
328 LOG.debug("wait event matched: {}", rpcName);
329 eventBag.remove(switchTestWaitForRpc);
330 if (eventBag.isEmpty()) {
339 if (!msgLot.isEmpty()) {
340 msg = Joiner.on(" | ").join(msgLot);
342 } else if (eventPlan.peek() instanceof SwitchTestWaitForRpcEvent) {
343 SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan
345 msg = checkSingleRpcContent(rpcInput, rpcName,
351 LOG.debug("rpc check .. FAILED: " + msg);
352 occuredExceptions.add(new IllegalArgumentException("step:"
353 + planItemCounter + " | " + msg));
355 LOG.debug("rpc check .. OK");
364 * @param switchTestWaitForRpc
367 private static String checkSingleRpcContent(OfHeader rpcInput,
368 String rpcName, SwitchTestWaitForRpcEvent switchTestWaitForRpc) {
369 String failureMsg = null;
370 if (!rpcName.equals(switchTestWaitForRpc.getRpcName())) {
371 failureMsg = "expected rpc name ["
372 + switchTestWaitForRpc.getRpcName() + "], got [" + rpcName
374 } else if (!rpcInput.getXid().equals(switchTestWaitForRpc.getXid())) {
375 failureMsg = "expected " + rpcName + ".xid ["
376 + switchTestWaitForRpc.getXid() + "], got ["
377 + rpcInput.getXid() + "]";
389 private synchronized void checkRpcAndNext(OfHeader rpcInput, String rpcName) {
390 boolean finished = checkRpc(rpcInput, rpcName);
397 * discard current event, execute next, if possible
399 private void next() {
400 LOG.debug("<---> STEPPING TO NEXT event in plan (leaving [{}] {})",
401 planItemCounter, eventPlan.peek());
406 Thread.sleep(JOB_DELAY);
407 } catch (InterruptedException e) {
408 LOG.error(e.getMessage(), e);
414 * start or continue processing plan
416 private synchronized void proceed() {
417 boolean processed = false;
418 LOG.debug("proceeding plan item[{}]: {}", planItemCounter,
420 if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
421 SwitchTestNotificationEvent notification = (SwitchTestNotificationEvent) eventPlan
423 processNotification(notification);
425 } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
426 SwitchTestRcpResponseEvent rpcResponse = (SwitchTestRcpResponseEvent) eventPlan
428 processRpcResponse(rpcResponse);
430 } else if (eventPlan.peek() instanceof SwitchTestCallbackEvent) {
431 SwitchTestCallbackEvent callbackEvent = (SwitchTestCallbackEvent) eventPlan
434 callbackEvent.getCallback().call();
435 } catch (Exception e) {
436 LOG.error(e.getMessage(), e);
437 occuredExceptions.add(e);
446 LOG.debug("now WAITING for OF_LISTENER to act ..");
447 wait(proceedTimeout);
448 } catch (InterruptedException e) {
449 LOG.error(e.getMessage(), e);
456 LOG.debug("|---> evenPlan STARTING ..");
458 while (!eventPlan.isEmpty()) {
463 .add(new IllegalStateException(
464 "eventPlan STALLED, planItemCounter="
471 Thread.sleep(JOB_DELAY);
472 } catch (InterruptedException e) {
473 LOG.error(e.getMessage(), e);
475 LOG.debug("<---| eventPlan DONE");
479 * @param notificationEvent
481 private synchronized void processNotification(
482 final SwitchTestNotificationEvent notificationEvent) {
484 Notification notification = notificationEvent.getPlannedNotification();
485 LOG.debug("notificating OF_LISTENER: "
486 + notification.getClass().getSimpleName());
489 if (notification instanceof DisconnectEvent) {
490 systemListener.onDisconnectEvent((DisconnectEvent) notification);
493 else if (notification instanceof EchoRequestMessage) {
494 ofListener.onEchoRequestMessage((EchoRequestMessage) notification);
495 } else if (notification instanceof ErrorMessage) {
496 ofListener.onErrorMessage((ErrorMessage) notification);
497 } else if (notification instanceof ExperimenterMessage) {
499 .onExperimenterMessage((ExperimenterMessage) notification);
500 } else if (notification instanceof FlowRemovedMessage) {
501 ofListener.onFlowRemovedMessage((FlowRemovedMessage) notification);
502 } else if (notification instanceof HelloMessage) {
503 ofListener.onHelloMessage((HelloMessage) notification);
504 } else if (notification instanceof MultipartReplyMessage) {
506 .onMultipartReplyMessage((MultipartReplyMessage) notification);
507 } else if (notification instanceof PacketInMessage) {
508 ofListener.onPacketInMessage((PacketInMessage) notification);
509 } else if (notification instanceof PortStatusMessage) {
510 ofListener.onPortStatusMessage((PortStatusMessage) notification);
514 occuredExceptions.add(new IllegalStateException("step:"
515 + planItemCounter + " | "
516 + "message listening not supported for type: "
517 + notification.getClass()));
520 LOG.debug("notification [" + notification.getClass().getSimpleName()
527 private synchronized void processRpcResponse(
528 final SwitchTestRcpResponseEvent rpcResponse) {
529 OfHeader plannedRpcResponseValue = rpcResponse.getPlannedRpcResponse();
530 LOG.debug("rpc-responding to OF_LISTENER: " + rpcResponse.getXid());
532 @SuppressWarnings("unchecked")
533 SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
534 .get(rpcResponse.getXid());
536 if (response != null) {
537 boolean successful = plannedRpcResponseValue != null;
538 Collection<RpcError> errors;
540 errors = Collections.emptyList();
542 errors = Lists.newArrayList(RpcErrors.getRpcError("unit",
543 "unit", "not requested", ErrorSeverity.ERROR,
544 "planned response to RPC.id = " + rpcResponse.getXid(),
545 ErrorType.RPC, new Exception(
546 "rpc response failed (planned behavior)")));
548 RpcResult<?> result = Rpcs.getRpcResult(successful,
549 plannedRpcResponseValue, errors);
550 response.set(result);
552 String msg = "RpcResponse not expected: xid="
553 + rpcResponse.getXid() + ", "
554 + plannedRpcResponseValue.getClass().getSimpleName();
556 occuredExceptions.add(new IllegalStateException("step:"
557 + planItemCounter + " | " + msg));
560 LOG.debug("rpc [" + rpcResponse.getXid() + "] .. done");
566 * @return rpc future result
568 private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> createAndRegisterRpcResult(
570 SettableFuture<RpcResult<OUT>> result = SettableFuture.create();
571 rpcResults.put(arg0.getXid(), result);
576 * @return rpc future result
578 private synchronized SettableFuture<RpcResult<Void>> createOneWayRpcResult() {
579 SettableFuture<RpcResult<Void>> result = SettableFuture.create();
580 List<RpcError> errors = Collections.emptyList();
581 result.set(Rpcs.getRpcResult(true, (Void) null, errors));
587 * the eventPlan to set
589 public void setEventPlan(Stack<? extends SwitchTestEvent> eventPlan) {
590 this.eventPlan = eventPlan;
594 * @param proceedTimeout
595 * max timeout for processing one planned event (in [ms])
597 public void setProceedTimeout(long proceedTimeout) {
598 this.proceedTimeout = proceedTimeout;
602 * @return the occuredExceptions
604 public List<Exception> getOccuredExceptions() {
605 return occuredExceptions;
609 public void fireConnectionReadyNotification() {
610 connectionReadyListener.onConnectionReady();
614 public void setConnectionReadyListener(
615 ConnectionReadyListener connectionReadyListener) {
616 this.connectionReadyListener = connectionReadyListener;
620 public Future<RpcResult<Void>> multipartRequest(MultipartRequestInput arg0) {
621 checkRpcAndNext(arg0, "multipartRequestInput");
622 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
627 public InetSocketAddress getRemoteAddress() {
628 // TODO Auto-generated method stub
633 public boolean isAutoRead() {
638 public void setAutoRead(boolean autoRead) {
639 this.autoRead = autoRead;