d473a462c410332781b906c743804bca54d3da8a
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / FlowProgrammerService.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
10
11 import java.nio.ByteBuffer;
12 import java.util.ArrayList;
13 import java.util.HashMap;
14 import java.util.HashSet;
15 import java.util.Map;
16 import java.util.Set;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ConcurrentMap;
19
20 import org.eclipse.osgi.framework.console.CommandInterpreter;
21 import org.eclipse.osgi.framework.console.CommandProvider;
22 import org.opendaylight.controller.protocol_plugin.openflow.IFlowProgrammerNotifier;
23 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
24 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
25 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
26 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
27 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Error;
28 import org.openflow.protocol.OFError;
29 import org.openflow.protocol.OFFlowMod;
30 import org.openflow.protocol.OFFlowRemoved;
31 import org.openflow.protocol.OFMessage;
32 import org.openflow.protocol.OFPort;
33 import org.openflow.protocol.OFType;
34 import org.openflow.protocol.action.OFAction;
35
36 import org.opendaylight.controller.sal.core.ContainerFlow;
37 import org.opendaylight.controller.sal.core.IContainerListener;
38 import org.opendaylight.controller.sal.core.Node;
39 import org.opendaylight.controller.sal.core.Node.NodeIDType;
40 import org.opendaylight.controller.sal.core.NodeConnector;
41 import org.opendaylight.controller.sal.core.Property;
42 import org.opendaylight.controller.sal.core.UpdateType;
43 import org.opendaylight.controller.sal.flowprogrammer.Flow;
44 import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService;
45 import org.opendaylight.controller.sal.match.Match;
46 import org.opendaylight.controller.sal.match.MatchType;
47 import org.opendaylight.controller.sal.utils.GlobalConstants;
48 import org.opendaylight.controller.sal.utils.HexEncode;
49 import org.opendaylight.controller.sal.utils.NodeCreator;
50 import org.opendaylight.controller.sal.utils.StatusCode;
51 import org.opendaylight.controller.sal.utils.Status;
52 import org.osgi.framework.BundleContext;
53 import org.osgi.framework.FrameworkUtil;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 /**
58  * Represents the openflow plugin component in charge of programming the flows
59  * the flow programming and relay them to functional modules above SAL.
60  */
61 public class FlowProgrammerService implements IPluginInFlowProgrammerService,
62         IMessageListener, IContainerListener, IInventoryShimExternalListener,
63         CommandProvider {
64     private static final Logger log = LoggerFactory
65             .getLogger(FlowProgrammerService.class);
66     private IController controller;
67     private ConcurrentMap<String, IFlowProgrammerNotifier> flowProgrammerNotifiers;
68     private Map<String, Set<NodeConnector>> containerToNc;
69     private ConcurrentMap<Long, Map<Integer, Long>> xid2rid;
70     private int barrierMessagePriorCount = getBarrierMessagePriorCount();
71
72     public FlowProgrammerService() {
73         controller = null;
74         flowProgrammerNotifiers = new ConcurrentHashMap<String, IFlowProgrammerNotifier>();
75         containerToNc = new HashMap<String, Set<NodeConnector>>();
76         xid2rid = new ConcurrentHashMap<Long, Map<Integer, Long>>();
77     }
78
79     public void setController(IController core) {
80         this.controller = core;
81     }
82
83     public void unsetController(IController core) {
84         if (this.controller == core) {
85             this.controller = null;
86         }
87     }
88
89     public void setFlowProgrammerNotifier(Map<String, ?> props,
90             IFlowProgrammerNotifier s) {
91         if (props == null || props.get("containerName") == null) {
92             log.error("Didn't receive the service correct properties");
93             return;
94         }
95         String containerName = (String) props.get("containerName");
96         this.flowProgrammerNotifiers.put(containerName, s);
97     }
98
99     public void unsetFlowProgrammerNotifier(Map<String, ?> props,
100             IFlowProgrammerNotifier s) {
101         if (props == null || props.get("containerName") == null) {
102             log.error("Didn't receive the service correct properties");
103             return;
104         }
105         String containerName = (String) props.get("containerName");
106         if (this.flowProgrammerNotifiers != null
107                 && this.flowProgrammerNotifiers.containsKey(containerName)
108                 && this.flowProgrammerNotifiers.get(containerName) == s) {
109             this.flowProgrammerNotifiers.remove(containerName);
110         }
111     }
112
113     /**
114      * Function called by the dependency manager when all the required
115      * dependencies are satisfied
116      * 
117      */
118     void init() {
119         this.controller.addMessageListener(OFType.FLOW_REMOVED, this);
120         this.controller.addMessageListener(OFType.ERROR, this);
121         registerWithOSGIConsole();
122     }
123
124     /**
125      * Function called by the dependency manager when at least one dependency
126      * become unsatisfied or when the component is shutting down because for
127      * example bundle is being stopped.
128      * 
129      */
130     void destroy() {
131     }
132
133     /**
134      * Function called by dependency manager after "init ()" is called and after
135      * the services provided by the class are registered in the service registry
136      * 
137      */
138     void start() {
139     }
140
141     /**
142      * Function called by the dependency manager before the services exported by
143      * the component are unregistered, this will be followed by a "destroy ()"
144      * calls
145      * 
146      */
147     void stop() {
148     }
149
150     @Override
151     public Status addFlow(Node node, Flow flow) {
152         return addFlowInternal(node, flow, 0);
153     }
154
155     @Override
156     public Status modifyFlow(Node node, Flow oldFlow, Flow newFlow) {
157         return modifyFlowInternal(node, oldFlow, newFlow, 0);
158     }
159
160     @Override
161     public Status removeFlow(Node node, Flow flow) {
162         return removeFlowInternal(node, flow, 0);
163     }
164
165     @Override
166     public Status addFlowAsync(Node node, Flow flow, long rid) {
167         return addFlowInternal(node, flow, rid);
168     }
169
170     @Override
171     public Status modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow,
172             long rid) {
173         return modifyFlowInternal(node, oldFlow, newFlow, rid);
174     }
175
176     @Override
177     public Status removeFlowAsync(Node node, Flow flow, long rid) {
178         return removeFlowInternal(node, flow, rid);
179     }
180
181     private Status addFlowInternal(Node node, Flow flow, long rid) {
182         String action = "add";
183         if (!node.getType().equals(NodeIDType.OPENFLOW)) {
184             return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
185                     action, "Invalid node type"));
186         }
187
188         if (controller != null) {
189             ISwitch sw = controller.getSwitch((Long) node.getID());
190             if (sw != null) {
191                 FlowConverter x = new FlowConverter(flow);
192                 OFMessage msg = x.getOFFlowMod(OFFlowMod.OFPFC_ADD, null);
193
194                 Object result;
195                 if (rid == 0) {
196                     /*
197                      * Synchronous message send. Each message is followed by a
198                      * Barrier message.
199                      */
200                     result = sw.syncSend(msg);
201                 } else {
202                     /*
203                      * Message will be sent asynchronously. A Barrier message
204                      * will be inserted automatically to synchronize the
205                      * progression.
206                      */
207                     result = asyncMsgSend(node, sw, msg, rid);  
208                 }
209                 if (result instanceof Boolean) {
210                     return ((Boolean) result == Boolean.TRUE) ? new Status(
211                             StatusCode.SUCCESS, null) : new Status(
212                             StatusCode.TIMEOUT, errorString(null, action,
213                                     "Request Timed Out"));
214                 } else if (result instanceof OFError) {
215                     OFError res = (OFError) result;
216                     if (res.getErrorType() == V6Error.NICIRA_VENDOR_ERRORTYPE) {
217                         V6Error er = new V6Error(res);
218                         byte[] b = res.getError();
219                         ByteBuffer bb = ByteBuffer.allocate(b.length);
220                         bb.put(b);
221                         bb.rewind();
222                         er.readFrom(bb);
223                         return new Status(StatusCode.INTERNALERROR,
224                                 errorString("program", action,
225                                         "Vendor Extension Internal Error"));
226                     }
227                     return new Status(StatusCode.INTERNALERROR, errorString(
228                             "program", action, Utils.getOFErrorString(res)));
229                 } else {
230                     return new Status(StatusCode.INTERNALERROR, errorString(
231                             "send", action, "Internal Error"));
232                 }
233             } else {
234                 return new Status(StatusCode.GONE, errorString("send", action,
235                         "Switch is not available"));
236             }
237         }
238         return new Status(StatusCode.INTERNALERROR, errorString("send", action,
239                 "Internal plugin error"));
240     }
241
242     private Status modifyFlowInternal(Node node, Flow oldFlow, Flow newFlow, long rid) {
243         String action = "modify";
244         if (!node.getType().equals(NodeIDType.OPENFLOW)) {
245             return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
246                     action, "Invalid node type"));
247         }
248         if (controller != null) {
249             ISwitch sw = controller.getSwitch((Long) node.getID());
250             if (sw != null) {
251                 OFMessage msg1 = null, msg2 = null;
252
253                 // If priority and match portion are the same, send a
254                 // modification message
255                 if (oldFlow.getPriority() != newFlow.getPriority()
256                         || !oldFlow.getMatch().equals(newFlow.getMatch())) {
257                     msg1 = new FlowConverter(oldFlow).getOFFlowMod(
258                             OFFlowMod.OFPFC_DELETE_STRICT, OFPort.OFPP_NONE);
259                     msg2 = new FlowConverter(newFlow).getOFFlowMod(
260                             OFFlowMod.OFPFC_ADD, null);
261                 } else {
262                     msg1 = new FlowConverter(newFlow).getOFFlowMod(
263                             OFFlowMod.OFPFC_MODIFY_STRICT, null);
264                 }
265                 /*
266                  * Synchronous message send
267                  */
268                 action = (msg2 == null) ? "modify" : "delete";
269                 Object result;
270                 if (rid == 0) {
271                     /*
272                      * Synchronous message send. Each message is followed by a
273                      * Barrier message.
274                      */
275                     result = sw.syncSend(msg1);
276                 } else {
277                     /*
278                      * Message will be sent asynchronously. A Barrier message
279                      * will be inserted automatically to synchronize the
280                      * progression.
281                      */
282                     result = asyncMsgSend(node, sw, msg1, rid);
283                 }
284                 if (result instanceof Boolean) {
285                     if ((Boolean) result == Boolean.FALSE) {
286                         return new Status(StatusCode.TIMEOUT, errorString(null,
287                                 action, "Request Timed Out"));
288                     } else if (msg2 == null) {
289                         return new Status(StatusCode.SUCCESS, null);
290                     }
291                 } else if (result instanceof OFError) {
292                     return new Status(StatusCode.INTERNALERROR, errorString(
293                             "program", action,
294                             Utils.getOFErrorString((OFError) result)));
295                 } else {
296                     return new Status(StatusCode.INTERNALERROR, errorString(
297                             "send", action, "Internal Error"));
298                 }
299
300                 if (msg2 != null) {
301                     action = "add";
302                     if (rid == 0) {
303                         /*
304                          * Synchronous message send. Each message is followed by a
305                          * Barrier message.
306                          */
307                         result = sw.syncSend(msg2);
308                     } else {
309                         /*
310                          * Message will be sent asynchronously. A Barrier message
311                          * will be inserted automatically to synchronize the
312                          * progression.
313                          */
314                         result = asyncMsgSend(node, sw, msg2, rid);
315                     }
316                     if (result instanceof Boolean) {
317                         return ((Boolean) result == Boolean.TRUE) ? new Status(
318                                 StatusCode.SUCCESS, null) : new Status(
319                                 StatusCode.TIMEOUT, errorString(null, action,
320                                         "Request Timed Out"));
321                     } else if (result instanceof OFError) {
322                         return new Status(StatusCode.INTERNALERROR,
323                                 errorString("program", action, Utils
324                                         .getOFErrorString((OFError) result)));
325                     } else {
326                         return new Status(StatusCode.INTERNALERROR,
327                                 errorString("send", action, "Internal Error"));
328                     }
329                 }
330             } else {
331                 return new Status(StatusCode.GONE, errorString("send", action,
332                         "Switch is not available"));
333             }
334         }
335         return new Status(StatusCode.INTERNALERROR, errorString("send", action,
336                 "Internal plugin error"));
337     }
338
339     private Status removeFlowInternal(Node node, Flow flow, long rid) {
340         String action = "remove";
341         if (!node.getType().equals(NodeIDType.OPENFLOW)) {
342             return new Status(StatusCode.NOTACCEPTABLE, errorString("send",
343                     action, "Invalid node type"));
344         }
345         if (controller != null) {
346             ISwitch sw = controller.getSwitch((Long) node.getID());
347             if (sw != null) {
348                 OFMessage msg = new FlowConverter(flow).getOFFlowMod(
349                         OFFlowMod.OFPFC_DELETE_STRICT, OFPort.OFPP_NONE);
350                 Object result;
351                 if (rid == 0) {
352                     /*
353                      * Synchronous message send. Each message is followed by a
354                      * Barrier message.
355                      */
356                     result = sw.syncSend(msg);
357                 } else {
358                     /*
359                      * Message will be sent asynchronously. A Barrier message
360                      * will be inserted automatically to synchronize the
361                      * progression.
362                      */
363                     result = asyncMsgSend(node, sw, msg, rid);
364                 }
365                 if (result instanceof Boolean) {
366                     return ((Boolean) result == Boolean.TRUE) ? new Status(
367                             StatusCode.SUCCESS, null) : new Status(
368                             StatusCode.TIMEOUT, errorString(null, action,
369                                     "Request Timed Out"));
370                 } else if (result instanceof OFError) {
371                     return new Status(StatusCode.INTERNALERROR, errorString(
372                             "program", action,
373                             Utils.getOFErrorString((OFError) result)));
374                 } else {
375                     return new Status(StatusCode.INTERNALERROR, errorString(
376                             "send", action, "Internal Error"));
377                 }
378             } else {
379                 return new Status(StatusCode.GONE, errorString("send", action,
380                         "Switch is not available"));
381             }
382         }
383         return new Status(StatusCode.INTERNALERROR, errorString("send", action,
384                 "Internal plugin error"));
385     }
386
387     @Override
388     public Status removeAllFlows(Node node) {
389         return new Status(StatusCode.SUCCESS, null);
390     }
391
392     private String errorString(String phase, String action, String cause) {
393         return "Failed to "
394                 + ((phase != null) ? phase + " the " + action
395                         + " flow message: " : action + " the flow: ") + cause;
396     }
397
398     @Override
399     public void receive(ISwitch sw, OFMessage msg) {
400         if (msg instanceof OFFlowRemoved) {
401             handleFlowRemovedMessage(sw, (OFFlowRemoved) msg);
402         } else if (msg instanceof OFError) {
403             handleErrorMessage(sw, (OFError) msg);
404         }
405     }
406
407     private void handleFlowRemovedMessage(ISwitch sw, OFFlowRemoved msg) {
408         Node node = NodeCreator.createOFNode(sw.getId());
409         Flow flow = new FlowConverter(msg.getMatch(),
410                 new ArrayList<OFAction>(0)).getFlow(node);
411         flow.setPriority(msg.getPriority());
412         flow.setIdleTimeout(msg.getIdleTimeout());
413         flow.setId(msg.getCookie());
414
415         Match match = flow.getMatch();
416         NodeConnector inPort = match.isPresent(MatchType.IN_PORT) ? (NodeConnector) match
417                 .getField(MatchType.IN_PORT).getValue() : null;
418
419         for (Map.Entry<String, IFlowProgrammerNotifier> containerNotifier : flowProgrammerNotifiers
420                 .entrySet()) {
421             String container = containerNotifier.getKey();
422             IFlowProgrammerNotifier notifier = containerNotifier.getValue();
423             /*
424              * Switch only provide us with the match information. For now let's
425              * try to identify the container membership only from the input port
426              * match field. In any case, upper layer consumers can derive
427              * whether the notification was not for them. More sophisticated
428              * filtering can be added later on.
429              */
430             if (inPort == null
431                     || container.equals(GlobalConstants.DEFAULT.toString())
432                     || this.containerToNc.get(container).contains(inPort)) {
433                 notifier.flowRemoved(node, flow);
434             }
435         }
436     }
437
438     private void handleErrorMessage(ISwitch sw, OFError errorMsg) {
439         Node node = NodeCreator.createOFNode(sw.getId());
440         OFMessage offendingMsg = errorMsg.getOffendingMsg();
441         Integer xid;
442         if (offendingMsg != null) {
443             xid = offendingMsg.getXid();
444         } else {
445             xid = errorMsg.getXid();
446         }
447
448         Long rid = getMessageRid(sw.getId(), xid);
449         /*
450          * Null or zero requestId indicates that the error message is meant for
451          * a sync message. It will be handled by the sync message worker thread.
452          * Hence we are done here.
453          */
454         if ((rid == null) || (rid == 0)) {
455             return;
456         }
457         
458         /*
459          * Notifies the caller that error has been reported for a previous flow
460          * programming request
461          */
462         for (Map.Entry<String, IFlowProgrammerNotifier> containerNotifier : flowProgrammerNotifiers
463                 .entrySet()) {
464             IFlowProgrammerNotifier notifier = containerNotifier.getValue();
465             notifier.flowErrorReported(node, rid, errorMsg);
466         }
467     }
468
469     @Override
470     public void tagUpdated(String containerName, Node n, short oldTag,
471             short newTag, UpdateType t) {
472
473     }
474
475     @Override
476     public void containerFlowUpdated(String containerName,
477             ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
478     }
479
480     @Override
481     public void nodeConnectorUpdated(String containerName, NodeConnector p,
482             UpdateType type) {
483         Set<NodeConnector> target = null;
484
485         switch (type) {
486         case ADDED:
487             if (!containerToNc.containsKey(containerName)) {
488                 containerToNc.put(containerName, new HashSet<NodeConnector>());
489             }
490             containerToNc.get(containerName).add(p);
491             break;
492         case CHANGED:
493             break;
494         case REMOVED:
495             target = containerToNc.get(containerName);
496             if (target != null) {
497                 target.remove(p);
498             }
499             break;
500         default:
501         }
502     }
503
504     @Override
505     public void containerModeUpdated(UpdateType t) {
506
507     }
508
509     @Override
510     public Status sendBarrierMessage(Node node) {
511         if (!node.getType().equals(NodeIDType.OPENFLOW)) {
512             return new Status(StatusCode.NOTACCEPTABLE,
513                     "The node does not support Barrier message.");
514         }
515
516         if (controller != null) {
517             long swid = (Long) node.getID();
518             ISwitch sw = controller.getSwitch(swid);
519             if (sw != null) {
520                 sw.sendBarrierMessage();
521                 clearXid2Rid(swid);
522                 return (new Status(StatusCode.SUCCESS, null));
523             } else {
524                 return new Status(StatusCode.GONE,
525                         "The node does not have a valid Switch reference.");
526             }
527         }
528         return new Status(StatusCode.INTERNALERROR,
529                 "Failed to send Barrier message.");
530     }
531     
532     /**
533      * This method sends the message asynchronously until the number of messages
534      * sent reaches a threshold. Then a Barrier message is sent automatically
535      * for sync purpose. An unique Request ID associated with the message is
536      * passed down by the caller. The Request ID will be returned to the caller
537      * when an error message is received from the switch.
538      * 
539      * @param node
540      *            The node
541      * @param msg
542      *            The switch
543      * @param msg
544      *            The OF message to be sent
545      * @param rid
546      *            The Request Id
547      * @return result
548      */
549     private Object asyncMsgSend(Node node, ISwitch sw, OFMessage msg, long rid) {
550         Object result = Boolean.TRUE;
551         long swid = (Long) node.getID();
552         int xid;
553
554         xid = sw.asyncSend(msg);
555         addXid2Rid(swid, xid, rid);
556         
557         Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
558         if (swxid2rid == null) {
559             return result;
560         }
561         
562         int size = swxid2rid.size();
563         if (size % barrierMessagePriorCount == 0) {
564             result = sendBarrierMessage(node);
565         }
566         
567         return result;
568     }
569     
570     /**
571      * A number of async messages are sent followed by a synchronous Barrier
572      * message. This method returns the maximum async messages that can be sent
573      * before the Barrier message.
574      * 
575      * @return The max count of async messages sent prior to Barrier message
576      */
577     private int getBarrierMessagePriorCount() {
578         String count = System.getProperty("of.barrierMessagePriorCount");
579         int rv = 100;
580
581         if (count != null) {
582             try {
583                 rv = Integer.parseInt(count);
584             } catch (Exception e) {
585             }
586         }
587
588         return rv;
589     }
590     
591     /**
592      * This method returns the message Request ID previously assigned by the
593      * caller for a given OF message xid
594      * 
595      * @param swid
596      *            The switch id
597      * @param xid
598      *            The OF message xid
599      * @return The Request ID
600      */
601     private Long getMessageRid(long swid, Integer xid) {
602         Long rid = null;
603
604         if (xid == null) {
605             return rid;
606         }
607
608         Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
609         if (swxid2rid != null) {
610             rid = swxid2rid.get(xid);
611         }
612         return rid;
613     }
614
615     /**
616      * This method returns a copy of outstanding xid to rid mappings.for a given
617      * switch
618      * 
619      * @param swid
620      *            The switch id
621      * @return a copy of xid2rid mappings
622      */
623     public Map<Integer, Long> getSwXid2Rid(long swid) {
624         Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
625         
626         if (swxid2rid != null) {
627             return new HashMap<Integer, Long>(swxid2rid);
628         } else {
629             return new HashMap<Integer, Long>();
630         }
631     }
632
633     /**
634      * Adds xid to rid mapping to the local DB
635      * 
636      * @param swid
637      *            The switch id
638      * @param xid
639      *            The OF message xid
640      * @param rid
641      *            The message Request ID
642      */
643     private void addXid2Rid(long swid, int xid, long rid) {
644         Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
645         if (swxid2rid != null) {
646             swxid2rid.put(xid, rid);
647         }
648     }
649
650     /**
651      * When an Error message is received, this method will be invoked to remove
652      * the offending xid from the local DB.
653      * 
654      * @param swid
655      *            The switch id
656      * @param xid
657      *            The OF message xid
658      */
659     private void removeXid2Rid(long swid, int xid) {
660         Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
661         if (swxid2rid != null) {
662             swxid2rid.remove(xid);
663         }
664     }
665
666     /**
667      * When a Barrier reply is received, this method will be invoked to clear
668      * the local DB
669      * 
670      * @param swid
671      *            The switch id
672      */
673     private void clearXid2Rid(long swid) {
674         Map<Integer, Long> swxid2rid = this.xid2rid.get(swid);
675         if (swxid2rid != null) {
676             swxid2rid.clear();
677         }
678     }
679
680     @Override
681     public void updateNode(Node node, UpdateType type, Set<Property> props) {
682         long swid = (Long)node.getID();
683         
684         switch (type) {
685         case ADDED:
686             Map<Integer, Long> swxid2rid = new HashMap<Integer, Long>();
687             this.xid2rid.put(swid, swxid2rid);
688             break;
689         case CHANGED:
690             break;
691         case REMOVED:
692             this.xid2rid.remove(swid);
693             break;
694         default:
695         }
696     }
697
698     @Override
699     public void updateNodeConnector(NodeConnector nodeConnector,
700             UpdateType type, Set<Property> props) {
701     }
702
703     private void registerWithOSGIConsole() {
704         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
705                 .getBundleContext();
706         bundleContext.registerService(CommandProvider.class.getName(), this,
707                 null);
708     }
709
710     @Override
711     public String getHelp() {
712         StringBuffer help = new StringBuffer();
713         help.append("-- Flow Programmer Service --\n");
714         help.append("\t px2r <node id>          - Print outstanding xid2rid mappings for a given node id\n");
715         help.append("\t px2rc                   - Print max num of async msgs prior to the Barrier\n");
716         return help.toString();
717     }
718
719     public void _px2r(CommandInterpreter ci) {
720         String st = ci.nextArgument();
721         if (st == null) {
722             ci.println("Please enter a valid node id");
723             return;
724         }
725         
726         long sid;
727         try {
728             sid = HexEncode.stringToLong(st);
729         } catch (NumberFormatException e) {
730             ci.println("Please enter a valid node id");
731             return;
732         }
733         
734         Map<Integer, Long> swxid2rid = this.xid2rid.get(sid);
735         if (swxid2rid == null) {
736             ci.println("The node id entered does not exist");
737             return;
738         }
739
740         ci.println("xid             rid");
741         
742         Set<Integer> xidSet = swxid2rid.keySet();
743         if (xidSet == null) {
744             return;
745         }
746
747         for (Integer xid : xidSet) {
748             ci.println(xid + "       " + swxid2rid.get(xid));
749         }
750     }
751
752     public void _px2rc(CommandInterpreter ci) {
753         ci.println("Max num of async messages sent prior to the Barrier message is "
754                 + barrierMessagePriorCount);
755     }
756 }