2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.openflow.md.core.plan;
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.Collections;
14 import java.util.HashMap;
15 import java.util.List;
18 import java.util.Stack;
19 import java.util.concurrent.Future;
21 import org.opendaylight.controller.sal.common.util.RpcErrors;
22 import org.opendaylight.controller.sal.common.util.Rpcs;
23 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
24 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SendMultipartRequestMessageInput;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEventBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
65 import org.opendaylight.yangtools.yang.binding.Notification;
66 import org.opendaylight.yangtools.yang.common.RpcError;
67 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
68 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
69 import org.opendaylight.yangtools.yang.common.RpcResult;
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
73 import com.google.common.base.Joiner;
74 import com.google.common.collect.Lists;
75 import com.google.common.util.concurrent.SettableFuture;
80 public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
82 /** notify and rpc-response thread start default delay in [ms] */
83 public static final int JOB_DELAY = 50;
85 protected static final Logger LOG = LoggerFactory
86 .getLogger(ConnectionAdapterStackImpl.class);
88 protected Stack<? extends SwitchTestEvent> eventPlan;
89 protected OpenflowProtocolListener ofListener;
90 protected SystemNotificationsListener systemListener;
92 protected Map<Long, SettableFuture<?>> rpcResults = new HashMap<>();
93 protected boolean planTouched = false;
95 private long proceedTimeout;
97 protected List<Exception> occuredExceptions = new ArrayList<>();
99 private ConnectionReadyListener connectionReadyListener;
101 private int planItemCounter;
106 public ConnectionAdapterStackImpl() {
111 public synchronized Future<RpcResult<BarrierOutput>> barrier(BarrierInput arg0) {
112 checkRpcAndNext(arg0, "barrier");
113 SettableFuture<RpcResult<BarrierOutput>> result = createAndRegisterRpcResult(arg0);
118 public synchronized Future<RpcResult<EchoOutput>> echo(EchoInput arg0) {
119 checkRpcAndNext(arg0, "echo");
120 Future<RpcResult<EchoOutput>> result = createAndRegisterRpcResult(arg0);
125 public Future<RpcResult<Void>> echoReply(EchoReplyInput arg0) {
126 checkRpcAndNext(arg0, "echoReply");
127 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
132 public Future<RpcResult<Void>> experimenter(ExperimenterInput arg0) {
133 checkRpcAndNext(arg0, "experimenter");
134 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
139 public Future<RpcResult<Void>> flowMod(FlowModInput arg0) {
140 checkRpcAndNext(arg0, "flowMod");
141 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
146 public synchronized Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput arg0) {
147 checkRpcAndNext(arg0, "echo");
148 Future<RpcResult<GetAsyncOutput>> result = createAndRegisterRpcResult(arg0);
153 public synchronized Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput arg0) {
154 checkRpcAndNext(arg0, "echo");
155 Future<RpcResult<GetConfigOutput>> result = createAndRegisterRpcResult(arg0);
160 public synchronized Future<RpcResult<GetFeaturesOutput>> getFeatures(
161 GetFeaturesInput arg0) {
162 checkRpcAndNext(arg0, "getFeatures");
163 Future<RpcResult<GetFeaturesOutput>> result = createAndRegisterRpcResult(arg0);
168 public synchronized Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
169 GetQueueConfigInput arg0) {
170 checkRpcAndNext(arg0, "echo");
171 Future<RpcResult<GetQueueConfigOutput>> result = createAndRegisterRpcResult(arg0);
176 public Future<RpcResult<Void>> groupMod(GroupModInput arg0) {
177 checkRpcAndNext(arg0, "groupMod");
178 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
183 public Future<RpcResult<Void>> hello(HelloInput arg0) {
184 checkRpcAndNext(arg0, "helloReply");
185 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
190 public Future<RpcResult<Void>> meterMod(MeterModInput arg0) {
191 checkRpcAndNext(arg0, "meterMod");
192 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
197 public Future<RpcResult<Void>> packetOut(PacketOutInput arg0) {
198 checkRpcAndNext(arg0, "packetOut");
199 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
204 public Future<RpcResult<Void>> portMod(PortModInput arg0) {
205 checkRpcAndNext(arg0, "portMod");
206 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
211 public synchronized Future<RpcResult<RoleRequestOutput>> roleRequest(
212 RoleRequestInput arg0) {
213 checkRpcAndNext(arg0, "echo");
214 Future<RpcResult<RoleRequestOutput>> result = createAndRegisterRpcResult(arg0);
219 public Future<RpcResult<Void>> setAsync(SetAsyncInput arg0) {
220 checkRpcAndNext(arg0, "setAsync");
221 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
226 public Future<RpcResult<Void>> setConfig(SetConfigInput arg0) {
227 checkRpcAndNext(arg0, "setConfig");
228 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
233 public Future<RpcResult<Void>> tableMod(TableModInput arg0) {
234 checkRpcAndNext(arg0, "tableMod");
235 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
240 public Future<Boolean> disconnect() {
241 LOG.info("adapter is told to disconnect");
242 DisconnectEventBuilder disconnectEventBuilder = new DisconnectEventBuilder();
243 disconnectEventBuilder.setInfo("disconnected by plugin");
244 systemListener.onDisconnectEvent(disconnectEventBuilder.build());
249 public boolean isAlive() {
255 public void setMessageListener(OpenflowProtocolListener ofListener) {
256 this.ofListener = ofListener;
260 public void checkListeners() {
261 if (ofListener == null || systemListener == null || connectionReadyListener == null) {
263 .add(new IllegalStateException("missing listeners"));
268 public void setSystemListener(SystemNotificationsListener systemListener) {
269 this.systemListener = systemListener;
278 private boolean checkRpc(OfHeader rpcInput, String rpcName) {
280 boolean finished = true;
282 if (eventPlan.isEmpty()) {
283 throw new IllegalStateException("eventPlan already depleted");
286 LOG.debug("checking rpc: name={}, ver={}, xid={}", rpcName, rpcInput.getVersion(), rpcInput.getXid());
287 if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent)
288 && !(eventPlan.peek() instanceof SwitchTestWaitForAllEvent)) {
289 if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
290 SwitchTestNotificationEvent notifEvent = (SwitchTestNotificationEvent) (eventPlan.peek());
291 msg = "expected [notification: " +notifEvent.getPlannedNotification()+ "], got [" + rpcInput.getClass().getSimpleName()
293 } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
294 SwitchTestRcpResponseEvent rpcEvent = (SwitchTestRcpResponseEvent) (eventPlan.peek());
295 msg = "expected [rpc: " +rpcEvent.getPlannedRpcResponse()+ "], got [" + rpcInput.getClass().getSimpleName()
299 if (eventPlan.peek() instanceof SwitchTestWaitForAllEvent) {
300 SwitchTestWaitForAllEvent switchTestWaitForAll = (SwitchTestWaitForAllEvent) eventPlan
302 Set<SwitchTestWaitForRpcEvent> eventBag = switchTestWaitForAll.getWaitEventBag();
303 List<String> msgLot = new ArrayList<>();
305 if (eventBag == null || eventBag.isEmpty()) {
306 msg = "no wait events in bag";
309 for (SwitchTestWaitForRpcEvent switchTestWaitForRpc : eventBag) {
310 String msgPart = checkSingleRpcContent(rpcInput, rpcName, switchTestWaitForRpc);
312 if (msgPart != null) {
315 LOG.debug("wait event matched: {}", rpcName);
316 eventBag.remove(switchTestWaitForRpc);
317 if (eventBag.isEmpty()) {
326 if (!msgLot.isEmpty()) {
327 msg = Joiner.on(" | ").join(msgLot);
329 } else if (eventPlan.peek() instanceof SwitchTestWaitForRpcEvent) {
330 SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan
332 msg = checkSingleRpcContent(rpcInput, rpcName, switchTestRpcEvent);
337 LOG.debug("rpc check .. FAILED: " + msg);
338 occuredExceptions.add(new IllegalArgumentException("step:"+planItemCounter+" | "+msg));
340 LOG.debug("rpc check .. OK");
349 * @param switchTestWaitForRpc
352 private static String checkSingleRpcContent(OfHeader rpcInput, String rpcName,
353 SwitchTestWaitForRpcEvent switchTestWaitForRpc) {
354 String failureMsg = null;
355 if (!rpcName.equals(switchTestWaitForRpc.getRpcName())) {
356 failureMsg = "expected rpc name [" + switchTestWaitForRpc.getRpcName()
357 + "], got [" + rpcName + "]";
358 } else if (!rpcInput.getXid().equals(switchTestWaitForRpc.getXid())) {
359 failureMsg = "expected "+rpcName+".xid [" + switchTestWaitForRpc.getXid()
360 + "], got [" + rpcInput.getXid() + "]";
372 private synchronized void checkRpcAndNext(OfHeader rpcInput, String rpcName) {
373 boolean finished = checkRpc(rpcInput, rpcName);
380 * discard current event, execute next, if possible
382 private void next() {
383 LOG.debug("<---> STEPPING TO NEXT event in plan (leaving [{}] {})", planItemCounter, eventPlan.peek());
388 Thread.sleep(JOB_DELAY);
389 } catch (InterruptedException e) {
390 LOG.error(e.getMessage(), e);
396 * start or continue processing plan
398 private synchronized void proceed() {
399 boolean processed = false;
400 LOG.debug("proceeding plan item[{}]: {}", planItemCounter, eventPlan.peek());
401 if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
402 SwitchTestNotificationEvent notification = (SwitchTestNotificationEvent) eventPlan
404 processNotification(notification);
406 } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
407 SwitchTestRcpResponseEvent rpcResponse = (SwitchTestRcpResponseEvent) eventPlan
409 processRpcResponse(rpcResponse);
411 } else if (eventPlan.peek() instanceof SwitchTestCallbackEvent) {
412 SwitchTestCallbackEvent callbackEvent = (SwitchTestCallbackEvent) eventPlan.peek();
414 callbackEvent.getCallback().call();
415 } catch (Exception e) {
416 LOG.error(e.getMessage(), e);
417 occuredExceptions.add(e);
426 LOG.debug("now WAITING for OF_LISTENER to act ..");
427 wait(proceedTimeout);
428 } catch (InterruptedException e) {
429 LOG.error(e.getMessage(), e);
436 LOG.debug("|---> evenPlan STARTING ..");
438 while (!eventPlan.isEmpty()) {
442 occuredExceptions.add(new IllegalStateException(
443 "eventPlan STALLED, planItemCounter="+planItemCounter));
449 Thread.sleep(JOB_DELAY);
450 } catch (InterruptedException e) {
451 LOG.error(e.getMessage(), e);
453 LOG.debug("<---| eventPlan DONE");
457 * @param notificationEvent
459 private synchronized void processNotification(
460 final SwitchTestNotificationEvent notificationEvent) {
462 Notification notification = notificationEvent
463 .getPlannedNotification();
464 LOG.debug("notificating OF_LISTENER: "
465 + notification.getClass().getSimpleName());
468 if (notification instanceof DisconnectEvent) {
470 .onDisconnectEvent((DisconnectEvent) notification);
473 else if (notification instanceof EchoRequestMessage) {
475 .onEchoRequestMessage((EchoRequestMessage) notification);
476 } else if (notification instanceof ErrorMessage) {
477 ofListener.onErrorMessage((ErrorMessage) notification);
478 } else if (notification instanceof ExperimenterMessage) {
480 .onExperimenterMessage((ExperimenterMessage) notification);
481 } else if (notification instanceof FlowRemovedMessage) {
483 .onFlowRemovedMessage((FlowRemovedMessage) notification);
484 } else if (notification instanceof HelloMessage) {
485 ofListener.onHelloMessage((HelloMessage) notification);
486 } else if (notification instanceof MultipartReplyMessage) {
488 .onMultipartReplyMessage((MultipartReplyMessage) notification);
489 } else if (notification instanceof MultipartRequestMessage) {
491 .onMultipartRequestMessage((MultipartRequestMessage) notification);
492 } else if (notification instanceof PacketInMessage) {
494 .onPacketInMessage((PacketInMessage) notification);
495 } else if (notification instanceof PortStatusMessage) {
497 .onPortStatusMessage((PortStatusMessage) notification);
501 occuredExceptions.add(new IllegalStateException("step:"+planItemCounter+" | " +
502 "message listening not supported for type: "
503 + notification.getClass()));
506 LOG.debug("notification ["+notification.getClass().getSimpleName()+"] .. done");
512 private synchronized void processRpcResponse(final SwitchTestRcpResponseEvent rpcResponse) {
513 OfHeader plannedRpcResponseValue = rpcResponse
514 .getPlannedRpcResponse();
515 LOG.debug("rpc-responding to OF_LISTENER: " + rpcResponse.getXid());
517 @SuppressWarnings("unchecked")
518 SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
519 .get(rpcResponse.getXid());
521 if (response != null) {
522 boolean successful = plannedRpcResponseValue != null;
523 Collection<RpcError> errors;
525 errors = Collections.emptyList();
528 .newArrayList(RpcErrors
534 "planned response to RPC.id = "
535 + rpcResponse.getXid(),
538 "rpc response failed (planned behavior)")));
540 RpcResult<?> result = Rpcs.getRpcResult(successful,
541 plannedRpcResponseValue, errors);
542 response.set(result);
544 String msg = "RpcResponse not expected: xid="
545 + rpcResponse.getXid()
547 + plannedRpcResponseValue.getClass()
550 occuredExceptions.add(new IllegalStateException("step:"+planItemCounter+" | " + msg));
553 LOG.debug("rpc ["+rpcResponse.getXid()+"] .. done");
559 * @return rpc future result
561 private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> createAndRegisterRpcResult(
563 SettableFuture<RpcResult<OUT>> result = SettableFuture.create();
564 rpcResults.put(arg0.getXid(), result);
569 * @return rpc future result
571 private synchronized SettableFuture<RpcResult<Void>> createOneWayRpcResult() {
572 SettableFuture<RpcResult<Void>> result = SettableFuture.create();
573 List<RpcError> errors = Collections.emptyList();
574 result.set(Rpcs.getRpcResult(true, (Void) null, errors));
580 * the eventPlan to set
582 public void setEventPlan(Stack<? extends SwitchTestEvent> eventPlan) {
583 this.eventPlan = eventPlan;
587 * @param proceedTimeout
588 * max timeout for processing one planned event (in [ms])
590 public void setProceedTimeout(long proceedTimeout) {
591 this.proceedTimeout = proceedTimeout;
595 * @return the occuredExceptions
597 public List<Exception> getOccuredExceptions() {
598 return occuredExceptions;
602 public void fireConnectionReadyNotification() {
603 connectionReadyListener.onConnectionReady();
607 public void setConnectionReadyListener(
608 ConnectionReadyListener connectionReadyListener) {
609 this.connectionReadyListener = connectionReadyListener;
613 public Future<RpcResult<Void>> sendMultipartRequestMessage(
614 SendMultipartRequestMessageInput arg0) {
615 checkRpcAndNext(arg0, "sendMultipartRequestMessage");
616 SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();