2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.openflow.md.core.plan;
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.Collections;
14 import java.util.HashMap;
15 import java.util.List;
18 import java.util.Stack;
19 import java.util.concurrent.Future;
21 import org.opendaylight.controller.sal.common.util.RpcErrors;
22 import org.opendaylight.controller.sal.common.util.Rpcs;
23 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
24 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEventBuilder;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
64 import org.opendaylight.yangtools.yang.binding.Notification;
65 import org.opendaylight.yangtools.yang.common.RpcError;
66 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
67 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
68 import org.opendaylight.yangtools.yang.common.RpcResult;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
72 import com.google.common.base.Joiner;
73 import com.google.common.collect.Lists;
74 import com.google.common.util.concurrent.SettableFuture;
79 public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
81 /** notify and rpc-response thread start default delay in [ms] */
82 public static final int JOB_DELAY = 50;
84 protected static final Logger LOG = LoggerFactory
85 .getLogger(ConnectionAdapterStackImpl.class);
87 protected Stack<? extends SwitchTestEvent> eventPlan;
88 protected OpenflowProtocolListener ofListener;
89 protected SystemNotificationsListener systemListener;
91 protected Map<Long, SettableFuture<?>> rpcResults = new HashMap<>();
92 protected boolean planTouched = false;
94 private long proceedTimeout;
96 protected List<Exception> occuredExceptions = new ArrayList<>();
98 private ConnectionReadyListener connectionReadyListener;
100 private int planItemCounter;
105 public ConnectionAdapterStackImpl() {
110 public synchronized Future<RpcResult<BarrierOutput>> barrier(BarrierInput arg0) {
111 checkRpcAndNext(arg0, "barrier");
112 SettableFuture<RpcResult<BarrierOutput>> result = createAndRegisterRpcResult(arg0);
117 public synchronized Future<RpcResult<EchoOutput>> echo(EchoInput arg0) {
118 checkRpcAndNext(arg0, "echo");
119 Future<RpcResult<EchoOutput>> result = createAndRegisterRpcResult(arg0);
124 public Future<RpcResult<Void>> echoReply(EchoReplyInput arg0) {
125 checkRpcAndNext(arg0, "echoReply");
126 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
131 public Future<RpcResult<Void>> experimenter(ExperimenterInput arg0) {
132 checkRpcAndNext(arg0, "experimenter");
133 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
138 public Future<RpcResult<Void>> flowMod(FlowModInput arg0) {
139 checkRpcAndNext(arg0, "flowMod");
140 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
145 public synchronized Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput arg0) {
146 checkRpcAndNext(arg0, "echo");
147 Future<RpcResult<GetAsyncOutput>> result = createAndRegisterRpcResult(arg0);
152 public synchronized Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput arg0) {
153 checkRpcAndNext(arg0, "echo");
154 Future<RpcResult<GetConfigOutput>> result = createAndRegisterRpcResult(arg0);
159 public synchronized Future<RpcResult<GetFeaturesOutput>> getFeatures(
160 GetFeaturesInput arg0) {
161 checkRpcAndNext(arg0, "getFeatures");
162 Future<RpcResult<GetFeaturesOutput>> result = createAndRegisterRpcResult(arg0);
167 public synchronized Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
168 GetQueueConfigInput arg0) {
169 checkRpcAndNext(arg0, "echo");
170 Future<RpcResult<GetQueueConfigOutput>> result = createAndRegisterRpcResult(arg0);
175 public Future<RpcResult<Void>> groupMod(GroupModInput arg0) {
176 checkRpcAndNext(arg0, "groupMod");
177 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
182 public Future<RpcResult<Void>> hello(HelloInput arg0) {
183 checkRpcAndNext(arg0, "helloReply");
184 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
189 public Future<RpcResult<Void>> meterMod(MeterModInput arg0) {
190 checkRpcAndNext(arg0, "meterMod");
191 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
196 public Future<RpcResult<Void>> packetOut(PacketOutInput arg0) {
197 checkRpcAndNext(arg0, "packetOut");
198 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
203 public Future<RpcResult<Void>> portMod(PortModInput arg0) {
204 checkRpcAndNext(arg0, "portMod");
205 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
210 public synchronized Future<RpcResult<RoleRequestOutput>> roleRequest(
211 RoleRequestInput arg0) {
212 checkRpcAndNext(arg0, "echo");
213 Future<RpcResult<RoleRequestOutput>> result = createAndRegisterRpcResult(arg0);
218 public Future<RpcResult<Void>> setAsync(SetAsyncInput arg0) {
219 checkRpcAndNext(arg0, "setAsync");
220 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
225 public Future<RpcResult<Void>> setConfig(SetConfigInput arg0) {
226 checkRpcAndNext(arg0, "setConfig");
227 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
232 public Future<RpcResult<Void>> tableMod(TableModInput arg0) {
233 checkRpcAndNext(arg0, "tableMod");
234 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
239 public Future<Boolean> disconnect() {
240 LOG.info("adapter is told to disconnect");
241 DisconnectEventBuilder disconnectEventBuilder = new DisconnectEventBuilder();
242 disconnectEventBuilder.setInfo("disconnected by plugin");
243 systemListener.onDisconnectEvent(disconnectEventBuilder.build());
248 public boolean isAlive() {
254 public void setMessageListener(OpenflowProtocolListener ofListener) {
255 this.ofListener = ofListener;
259 public void checkListeners() {
260 if (ofListener == null || systemListener == null || connectionReadyListener == null) {
262 .add(new IllegalStateException("missing listeners"));
267 public void setSystemListener(SystemNotificationsListener systemListener) {
268 this.systemListener = systemListener;
277 private boolean checkRpc(OfHeader rpcInput, String rpcName) {
279 boolean finished = true;
281 if (eventPlan.isEmpty()) {
282 throw new IllegalStateException("eventPlan already depleted");
285 LOG.debug("checking rpc: name={}, ver={}, xid={}", rpcName, rpcInput.getVersion(), rpcInput.getXid());
286 if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent)
287 && !(eventPlan.peek() instanceof SwitchTestWaitForAllEvent)) {
288 if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
289 SwitchTestNotificationEvent notifEvent = (SwitchTestNotificationEvent) (eventPlan.peek());
290 msg = "expected [notification: " +notifEvent.getPlannedNotification()+ "], got [" + rpcInput.getClass().getSimpleName()
292 } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
293 SwitchTestRcpResponseEvent rpcEvent = (SwitchTestRcpResponseEvent) (eventPlan.peek());
294 msg = "expected [rpc: " +rpcEvent.getPlannedRpcResponse()+ "], got [" + rpcInput.getClass().getSimpleName()
298 if (eventPlan.peek() instanceof SwitchTestWaitForAllEvent) {
299 SwitchTestWaitForAllEvent switchTestWaitForAll = (SwitchTestWaitForAllEvent) eventPlan
301 Set<SwitchTestWaitForRpcEvent> eventBag = switchTestWaitForAll.getWaitEventBag();
302 List<String> msgLot = new ArrayList<>();
304 if (eventBag == null || eventBag.isEmpty()) {
305 msg = "no wait events in bag";
308 for (SwitchTestWaitForRpcEvent switchTestWaitForRpc : eventBag) {
309 String msgPart = checkSingleRpcContent(rpcInput, rpcName, switchTestWaitForRpc);
311 if (msgPart != null) {
314 LOG.debug("wait event matched: {}", rpcName);
315 eventBag.remove(switchTestWaitForRpc);
316 if (eventBag.isEmpty()) {
325 if (!msgLot.isEmpty()) {
326 msg = Joiner.on(" | ").join(msgLot);
328 } else if (eventPlan.peek() instanceof SwitchTestWaitForRpcEvent) {
329 SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan
331 msg = checkSingleRpcContent(rpcInput, rpcName, switchTestRpcEvent);
336 LOG.debug("rpc check .. FAILED: " + msg);
337 occuredExceptions.add(new IllegalArgumentException("step:"+planItemCounter+" | "+msg));
339 LOG.debug("rpc check .. OK");
348 * @param switchTestWaitForRpc
351 private static String checkSingleRpcContent(OfHeader rpcInput, String rpcName,
352 SwitchTestWaitForRpcEvent switchTestWaitForRpc) {
353 String failureMsg = null;
354 if (!rpcName.equals(switchTestWaitForRpc.getRpcName())) {
355 failureMsg = "expected rpc name [" + switchTestWaitForRpc.getRpcName()
356 + "], got [" + rpcName + "]";
357 } else if (!rpcInput.getXid().equals(switchTestWaitForRpc.getXid())) {
358 failureMsg = "expected "+rpcName+".xid [" + switchTestWaitForRpc.getXid()
359 + "], got [" + rpcInput.getXid() + "]";
371 private synchronized void checkRpcAndNext(OfHeader rpcInput, String rpcName) {
372 boolean finished = checkRpc(rpcInput, rpcName);
379 * discard current event, execute next, if possible
381 private void next() {
382 LOG.debug("<---> STEPPING TO NEXT event in plan (leaving [{}] {})", planItemCounter, eventPlan.peek());
387 Thread.sleep(JOB_DELAY);
388 } catch (InterruptedException e) {
389 LOG.error(e.getMessage(), e);
395 * start or continue processing plan
397 private synchronized void proceed() {
398 boolean processed = false;
399 LOG.debug("proceeding plan item[{}]: {}", planItemCounter, eventPlan.peek());
400 if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
401 SwitchTestNotificationEvent notification = (SwitchTestNotificationEvent) eventPlan
403 processNotification(notification);
405 } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
406 SwitchTestRcpResponseEvent rpcResponse = (SwitchTestRcpResponseEvent) eventPlan
408 processRpcResponse(rpcResponse);
410 } else if (eventPlan.peek() instanceof SwitchTestCallbackEvent) {
411 SwitchTestCallbackEvent callbackEvent = (SwitchTestCallbackEvent) eventPlan.peek();
413 callbackEvent.getCallback().call();
414 } catch (Exception e) {
415 LOG.error(e.getMessage(), e);
416 occuredExceptions.add(e);
425 LOG.debug("now WAITING for OF_LISTENER to act ..");
426 wait(proceedTimeout);
427 } catch (InterruptedException e) {
428 LOG.error(e.getMessage(), e);
435 LOG.debug("|---> evenPlan STARTING ..");
437 while (!eventPlan.isEmpty()) {
441 occuredExceptions.add(new IllegalStateException(
442 "eventPlan STALLED, planItemCounter="+planItemCounter));
448 Thread.sleep(JOB_DELAY);
449 } catch (InterruptedException e) {
450 LOG.error(e.getMessage(), e);
452 LOG.debug("<---| eventPlan DONE");
456 * @param notificationEvent
458 private synchronized void processNotification(
459 final SwitchTestNotificationEvent notificationEvent) {
461 Notification notification = notificationEvent
462 .getPlannedNotification();
463 LOG.debug("notificating OF_LISTENER: "
464 + notification.getClass().getSimpleName());
467 if (notification instanceof DisconnectEvent) {
469 .onDisconnectEvent((DisconnectEvent) notification);
472 else if (notification instanceof EchoRequestMessage) {
474 .onEchoRequestMessage((EchoRequestMessage) notification);
475 } else if (notification instanceof ErrorMessage) {
476 ofListener.onErrorMessage((ErrorMessage) notification);
477 } else if (notification instanceof ExperimenterMessage) {
479 .onExperimenterMessage((ExperimenterMessage) notification);
480 } else if (notification instanceof FlowRemovedMessage) {
482 .onFlowRemovedMessage((FlowRemovedMessage) notification);
483 } else if (notification instanceof HelloMessage) {
484 ofListener.onHelloMessage((HelloMessage) notification);
485 } else if (notification instanceof MultipartReplyMessage) {
487 .onMultipartReplyMessage((MultipartReplyMessage) notification);
488 } else if (notification instanceof PacketInMessage) {
490 .onPacketInMessage((PacketInMessage) notification);
491 } else if (notification instanceof PortStatusMessage) {
493 .onPortStatusMessage((PortStatusMessage) notification);
497 occuredExceptions.add(new IllegalStateException("step:"+planItemCounter+" | " +
498 "message listening not supported for type: "
499 + notification.getClass()));
502 LOG.debug("notification ["+notification.getClass().getSimpleName()+"] .. done");
508 private synchronized void processRpcResponse(final SwitchTestRcpResponseEvent rpcResponse) {
509 OfHeader plannedRpcResponseValue = rpcResponse
510 .getPlannedRpcResponse();
511 LOG.debug("rpc-responding to OF_LISTENER: " + rpcResponse.getXid());
513 @SuppressWarnings("unchecked")
514 SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
515 .get(rpcResponse.getXid());
517 if (response != null) {
518 boolean successful = plannedRpcResponseValue != null;
519 Collection<RpcError> errors;
521 errors = Collections.emptyList();
524 .newArrayList(RpcErrors
530 "planned response to RPC.id = "
531 + rpcResponse.getXid(),
534 "rpc response failed (planned behavior)")));
536 RpcResult<?> result = Rpcs.getRpcResult(successful,
537 plannedRpcResponseValue, errors);
538 response.set(result);
540 String msg = "RpcResponse not expected: xid="
541 + rpcResponse.getXid()
543 + plannedRpcResponseValue.getClass()
546 occuredExceptions.add(new IllegalStateException("step:"+planItemCounter+" | " + msg));
549 LOG.debug("rpc ["+rpcResponse.getXid()+"] .. done");
555 * @return rpc future result
557 private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> createAndRegisterRpcResult(
559 SettableFuture<RpcResult<OUT>> result = SettableFuture.create();
560 rpcResults.put(arg0.getXid(), result);
565 * @return rpc future result
567 private synchronized SettableFuture<RpcResult<Void>> createOneWayRpcResult() {
568 SettableFuture<RpcResult<Void>> result = SettableFuture.create();
569 List<RpcError> errors = Collections.emptyList();
570 result.set(Rpcs.getRpcResult(true, (Void) null, errors));
576 * the eventPlan to set
578 public void setEventPlan(Stack<? extends SwitchTestEvent> eventPlan) {
579 this.eventPlan = eventPlan;
583 * @param proceedTimeout
584 * max timeout for processing one planned event (in [ms])
586 public void setProceedTimeout(long proceedTimeout) {
587 this.proceedTimeout = proceedTimeout;
591 * @return the occuredExceptions
593 public List<Exception> getOccuredExceptions() {
594 return occuredExceptions;
598 public void fireConnectionReadyNotification() {
599 connectionReadyListener.onConnectionReady();
603 public void setConnectionReadyListener(
604 ConnectionReadyListener connectionReadyListener) {
605 this.connectionReadyListener = connectionReadyListener;
609 public Future<RpcResult<Void>> multipartRequest(
610 MultipartRequestInput arg0) {
611 checkRpcAndNext(arg0, "multipartRequestInput");
612 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();