SFC-PY - NFQ classifier updates 77/16277/3
authorDusan Madar <[email protected]>
Tue, 10 Mar 2015 15:51:44 +0000 (16:51 +0100)
committerReinaldo Penno <[email protected]>
Fri, 13 Mar 2015 02:27:37 +0000 (02:27 +0000)
- update docstrings
- update the internal data-store structure to hold the RSP name as well
- add name based RSP removal, add the functionality to remove all RSPs related to a given ACL (i.e. remove ACL)
- rename few methods to be more descriptive, move some of them to form a logical sequence in the file
- remove useless comments and constants

- update packet marking - packet mark is now a combination of RSP ID and IP version
- add methods to compose/decompose packet mark
- update packet processing and forwarding

Change-Id: I471c941b183da13cbe500149acbde60782b0ac15
Signed-off-by: Dusan Madar <[email protected]>
sfc-py/classifier/classifier.py

index c75eb48bab385448bbc1a72ffe669c1a85bbb0ed..d349f47bd0e7ee08fd0719aa9cf026db7c2c6aa2 100644 (file)
@@ -22,9 +22,9 @@ from nsh.common import VXLANGPE, BASEHEADER, CONTEXTHEADER
 
 
 __author__ = 'Martin Lauko, Dusan Madar'
 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
 __version__ = "0.1"
 __status__ = "alpha"
 
 
@@ -39,22 +39,23 @@ received packets accordingly.
 # nfq -> NetFilterQueue
 # nsh -> Network Service Headers
 # fwd -> forwarder/forwarding
-# tun -> tunnel
-# sp -> Service Path
-# spi -> Service Path Id
 # acl -> access list
 # ace -> access list entry
+# `rule(s)` and `chain(s)` can be used interchangeably in ip(6)tables context
 
 
 logger = logging.getLogger('classifier')
 logger.setLevel(logging.DEBUG)
 
+# silence `requests` module logging
+requests_logger = logging.getLogger('requests')
+requests_logger.setLevel(logging.WARNING)
+
 
 #: constants
-NFQ = 'NFQUEUE'
-NFQ_NUMBER = 2
 IPV4 = 4
 IPv6 = 6
+NFQ_NUMBER = 2
 
 
 #: ACE items to ip(6)tables flags/types mapping
@@ -74,6 +75,11 @@ ace_2_iptables = {'source-ips': {'flag': '-s',
                             'destination-port-range': '--dport'}
                   }
 
+#: IP version to NSH next protocol mapping
+ipv_2_next_protocol = {4: 0x1,      # IPv4
+                       6: 0x2,      # IPv6
+                       10: 0x3}     # Ethernet
+
 
 def run_cmd(cmd):
     """
@@ -84,12 +90,20 @@ def run_cmd(cmd):
 
     """
     cmd = [str(cmd_part) for cmd_part in cmd]
+    logger.debug('Executing command: %s', ' '.join(cmd))
 
     try:
-        logger.debug('Executing command: `%s`', ' '.join(cmd))
+        process = subprocess.Popen(cmd,
+                                   stdout=subprocess.PIPE,
+                                   stderr=subprocess.PIPE)
+
+        _, err = process.communicate()
+
+        if process.returncode != 0:
+            err = err.strip()
+            logger.exception(err.decode())
 
-        subprocess.check_call(cmd)
-    except subprocess.CalledProcessError:
+    except OSError:
         logger.exception('Command execution failed')
 
 
@@ -118,7 +132,7 @@ def run_iptables_cmd(arguments, ipv):
     iptables = 'iptables'
     ip6tables = 'ip6tables'
 
-    if IPV4 in ipv and IPv6 in ipv:
+    if (IPV4 in ipv) and (IPv6 in ipv):
         ip_tables = (iptables, ip6tables)
     elif IPV4 in ipv:
         ip_tables = (iptables,)
@@ -155,53 +169,61 @@ class NfqClassifier(metaclass=Singleton):
 
         How it works:
         1. sfc_agent receives an ACL and passes it for processing
-        2. the RSP referenced by ACL is requested from ODL
+        2. the RSP (its SFF locator) referenced by ACL is requested from ODL
         3. if the RSP exists in the ODL iptables rules for it are applied
 
         After this process is over, every packet successfully matched to an
-        iptables rule will be NSH encapsulated and traverses appropriate RSP.
+        iptables rule (i.e. successfully classified) will be NSH encapsulated
+        and forwarded to a related SFF, which knows how to traverse the RSP.
 
         Rules are created using appropriate iptables command. If the ACE rule
         is MAC address related both iptables and ip6tabeles rules re issued.
         If ACE rule is IPv4 address related, only iptables rules are issued,
         same for IPv6.
 
-        ACL            RULES
+        ACL            RULES FOR
         ----------------------------------
         MAC            iptables, ip6tables
         IPv4           iptables
         IPv6           ip6tables
 
-        Information regarding redirection to RSP(s) are stored as a simple RSP
-        to SFF mapping (rsp_id -> sff); which is represented as
-        a dictionary:
-
-        {rsp_id: {'ip': <ip>,
-                  'port': <port>,
-                  'starting-index': <starting-index>,
-                  'transport-type': <transport-type>,
-                  'chains': {'chain_name': (<ipv>,)}, ...},
+        Information regarding already registered RSP(s) are stored in an
+        internal data-store, which is represented as a dictionary:
+
+        {rsp_id: {'name': <rsp_name>,
+                  'chains': {'chain_name': (<ipv>,),
+                             ...
+                             },
+                  'sff': {'ip': <ip>,
+                          'port': <port>,
+                          'starting-index': <starting-index>,
+                          'transport-type': <transport-type>
+                          },
+                  },
         ...
         }
 
         Where:
-            - ip: SFF IP
-            - port: SFF port
-            - starting-index: index given to packet at first RSP hop
-            - transport-type:
-            - chains: dict of iptables chains (rules) related to the RSP
+            - name: RSP name
+            - chains: dict of iptables rules/chains related to the RSP
+            - SFF:
+                - ip: SFF IP
+                - port: SFF port
+                - starting-index: index given to packet at first RSP hop
+                - transport-type:
 
         """
         # internal data-store
         self.rsp_2_sff = {}
 
-        self.nfq = NetfilterQueue()
+        # NFQ for classified packets, initialized by packet_collector()
+        self.nfq = None
 
         # socket used to forward NSH encapsulated packets
         self.fwd_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 
         # identifiers of the currently processed RSP, set by process_acl()
-        # these will be different for each processed ACL
+        # these will be different for each processed ACL/ACE
         self.rsp_id = None
         self.rsp_acl = None
         self.rsp_ace = None
@@ -209,7 +231,11 @@ class NfqClassifier(metaclass=Singleton):
 
         # IP version of the currently processed RSP, set by parse_ace()
         # this attribute serves as run_iptables_cmd() 'ipv' argument
-        self.rsp_ipv = (IPV4, IPv6)
+        self.rsp_ipv = None
+
+        # currently processed RSP mark, set by parse_ace()
+        # this attribute serves as the ip(6)tables mark argument
+        self.rsp_mark = None
 
     def _get_current_ip_version(self, ip):
         """
@@ -229,7 +255,23 @@ class NfqClassifier(metaclass=Singleton):
 
         return ip.version
 
-    def _fetch_rsp_from_odl(self, rsp_name):
+    def _get_rsp_by_name(self, rsp_name):
+        """
+        Retrieve RSP data from the data-store based on its name
+
+        :param rsp_name: RSP name
+        :type rsp_name: str
+
+        :return tuple (rsp_id, rsp_data) or None
+
+        """
+        for rsp_id, rsp_data in self.rsp_2_sff.items():
+            if rsp_name == rsp_data['name']:
+                return rsp_id, rsp_data
+        else:
+            return None
+
+    def _fetch_rsp_first_hop_from_odl(self, rsp_name):
         """
         Fetch RSPs' forwarding parameters (SFF locator) from ODL
 
@@ -246,44 +288,54 @@ class NfqClassifier(metaclass=Singleton):
                     {'name': rsp_name}
                 }
 
-        logger.info('Requesting RSP "%s" from ODL', rsp_name)
-        rsp_data = requests.post(url=url,
-                                 data=json.dumps(data),
-                                 auth=(USERNAME, PASSWORD),
-                                 headers={'content-type': 'application/json'})
+        headers = {'content-type': 'application/json'}
+
+        try:
+            logger.info('Requesting SFF for RSP "%s" from ODL', rsp_name)
+            rsp_data = requests.post(url=url,
+                                     timeout=5,
+                                     headers=headers,
+                                     data=json.dumps(data),
+                                     auth=(USERNAME, PASSWORD))
+        except requests.exceptions.Timeout:
+            logger.exception('Failed to get RSP "%s" from ODL: timeout',
+                             rsp_name)
+            return
 
         if not rsp_data.content:
             logger.warning('RSP "%s" not found in ODL', rsp_name)
-            return None
+            return
 
         rsp_json = rsp_data.json()
         return rsp_json['output']['rendered-service-path-first-hop']
 
-    def _process_packet(self, packet):
+    def _compose_packet_mark(self):
         """
-        Main NFQ callback for each queued packet.
-        Drop the packet if RSP is unknown, pass it for processing otherwise.
+        Compose packet mark as a combination of the RSP ID and IP version
 
-        :param packet: packet to process
-        :type packet: `:class:netfilterqueue.Packet`
+        :return int
 
         """
-        try:
-            # packet mark contains a RSP identifier
-            rsp_id = packet.get_mark()
+        return int(str(self.rsp_id) + "%02d" % sum(self.rsp_ipv))
+
+    def _decompose_packet_mark(self, mark):
+        """
+        Decompose packet mark to the RSP ID and IP version
 
-            logger.debug('NFQ received a %s, marked "%d"', packet, rsp_id)
+        :param mark: packet mark
+        :type mark: int
 
-            if rsp_id in self.rsp_2_sff:
-                self.forward_packet(packet, rsp_id)
-            else:
-                logger.warning('Dropping packet as it did\'t match any rule')
-                packet.drop()
+        :return tuple
 
-        except:
-            logger.exception('NFQ failed to receive a packet')
+        """
+        mark = str(mark)
+
+        rsp_id = int(mark[:-2])
+        ipv = int(mark[-2:])
+
+        return rsp_id, ipv
 
-    def forward_packet(self, packet, rsp_id):
+    def forward_packet(self, packet, rsp_id, ipv):
         """
         Encapsulate given packet with NSH and forward it to SFF related with
         currently matched RSP
@@ -292,22 +344,86 @@ class NfqClassifier(metaclass=Singleton):
         :type packet: `:class:netfilterqueue.Packet`
         :param rsp_id: RSP identifier
         :type rsp_id: int
+        :param ipv: IP version
+        :type ipv: int
 
         """
-        fwd_to = self.rsp_2_sff[rsp_id]
+        fwd_to = self.rsp_2_sff[rsp_id]['sff']
+        next_protocol = ipv_2_next_protocol[ipv]
 
-        # TODO: get context headers from ODL, using default values for now
+        # NOTES:
+        # so far metadata are not supported -> just sending an empty ctx_header
+        # tunnel_id (0x0500) is hard-coded, will it be always the same?
         ctx_header = CONTEXTHEADER(0, 0, 0, 0)
-        # TODO: tunnel_id (0x0500) is hard-coded
         vxlan_header = VXLANGPE(int('00000100', 2), 0, 0x894F, 0x0500, 64)
-        base_header = BASEHEADER(0x1, int('01000000', 2), 0x6, 0x1, 0x1,
-                                 rsp_id, fwd_to['starting-index'])
+        base_header = BASEHEADER(0x1, int('01000000', 2), 0x6,
+                                 0x1, next_protocol, rsp_id,
+                                 fwd_to['starting-index'])
 
         nsh_encapsulation = build_packet(vxlan_header, base_header, ctx_header)
         nsh_packet = nsh_encapsulation + packet.get_payload()
 
         self.fwd_socket.sendto(nsh_packet, (fwd_to['ip'], fwd_to['port']))
 
+    def process_packet(self, packet):
+        """
+        Main NFQ callback for each classified packet.
+        Drop the packet if RSP is unknown, pass it for processing otherwise.
+
+        :param packet: packet to process
+        :type packet: `:class:netfilterqueue.Packet`
+
+        """
+        try:
+            mark = packet.get_mark()
+            rsp_id, ipv = self._decompose_packet_mark(mark)
+
+            logger.debug('NFQ received a %s, marked "%d"', packet, mark)
+
+            if rsp_id in self.rsp_2_sff:
+                self.forward_packet(packet, rsp_id, ipv)
+            else:
+                logger.warning('Dropping packet as it did\'t match any rule')
+                packet.drop()
+
+        except:
+            logger.exception('NFQ failed to receive a packet')
+
+    def packet_collector(self):
+        """
+        Main NFQ related method. Configure the queue and wait for packets.
+
+        NOTE: NetfilterQueue.run() blocs!
+
+        """
+        try:
+            self.nfq = NetfilterQueue()
+
+            logger.info('Binding to NFQ queue number "%s"', NFQ_NUMBER)
+            self.nfq.bind(NFQ_NUMBER, self.process_packet)
+        except:
+            msg = ('Failed to bind to the NFQ queue number "%s". '
+                   'HINT: try to run command `sudo iptables -L` to check if '
+                   'the required queue is available.' % NFQ_NUMBER)
+
+            logger.exception(msg)
+            raise
+
+        try:
+            logger.info('Starting NFQ - waiting for packets ...')
+            self.nfq.run()
+        except:
+            logger.exception('Failed to start NFQ')
+            raise
+
+    def collect_packets(self):
+        """
+        Start a thread for classified packets collection
+        """
+        nfq_thread = threading.Thread(target=self.packet_collector)
+        nfq_thread.daemon = True
+        nfq_thread.start()
+
     def parse_ace(self, ace_matches):
         """
         Parse given Access List Entries (ACE) matches and put together an
@@ -378,55 +494,74 @@ class NfqClassifier(metaclass=Singleton):
         if source_mac in ace_matches:
             ace_rule_cmd.extend(['-m', 'mac', '--mac-source'])
             ace_rule_cmd.append(ace_matches[source_mac])
+            self.rsp_ipv = (IPV4, IPv6)
 
-        ace_rule_cmd.extend(['-j', 'MARK', '--set-mark', self.rsp_id])
+        self.rsp_mark = self._compose_packet_mark()
+        ace_rule_cmd.extend(['-j', 'MARK', '--set-mark', self.rsp_mark])
         return ace_rule_cmd
 
     def process_acl(self, acl_data):
         """
-        Parse ACL data and create iptables rules accordingly
+        Parse ACL data and create/remove ip(6)tables rules accordingly.
+
+        To be able to create/remove an ip(6)tables rule/chain these attributes
+        must be set (i.e. not None):
+        self.rsp_chain, self.rsp_ipv + self.rsp_id for creating a rule/chain
 
         :param acl_data: ACL
         :type acl_data: dict
 
         """
         for acl in acl_data['access-list']:
-
-            self.rsp_acl = acl['acl-name']
+            self.rsp_acl = acl['acl-name'].upper()
 
             for ace in acl['access-list-entries']:
+                if 'delete' in ace:
+                    self.remove_acl_rsps()
+                    return
+
                 rsp_name = (ace['actions']
                                ['service-function-acl:rendered-service-path'])
 
-                rsp = self._fetch_rsp_from_odl(rsp_name)
-                if rsp is None:
+                sff_data = self._fetch_rsp_first_hop_from_odl(rsp_name)
+                if sff_data is None:
                     continue
 
-                self.rsp_id = rsp['path-id']
-                self.rsp_ace = ace['rule-name']
+                # NOTE: assuming that RSP IDs are unique
+                rsp_id = sff_data['path-id']
+                if rsp_id in self.rsp_2_sff:
+                    logger.warning('RSP "%s" already exists', rsp_id)
+                    continue
+
+                self.rsp_id = rsp_id
+                self.rsp_ace = ace['rule-name'].upper()
                 self.rsp_chain = '-'.join((self.rsp_acl,
                                            self.rsp_ace,
                                            'RSP',
-                                           str(rsp['path-id'])))
+                                           str(rsp_id)))
 
-                # `self.rsp_ipv` is set by this
+                # `self.rsp_ipv` and `self.rsp_mark` are set by this
                 ace_rule_cmd = self.parse_ace(ace['matches'])
 
-                # TODO: test - this will very probably not work properly
-                if 'delete' in ace:
-                    self.remove_rsp()
-                    return
-
-                self.create_rsp(rsp)
+                self.create_rsp(rsp_name, sff_data)
                 run_iptables_cmd(ace_rule_cmd, self.rsp_ipv)
 
     def register_rsp(self):
         """
         Create iptables rules for the current ACL -> ACE -> RSP
 
-        In other words: create an iptables chain for the given RSP, direct all
-        incoming packets through this chain and send matched packets (matching
-        is mark based) to the NetfilterQueue.
+        In other words: create an iptables chain for the current RSP, mark
+        traversing packets and redirect them to the NFQ.
+
+        Packet mark (which must be an integer) is a combination of the RSP ID
+        and the IP version for which an ip(6)tables rule/chain exists.
+        IP version is described by the last two mark digits, i.e. 04 -> IPv4,
+        06 -> IPv6, 10 -> IPv4 and IPv6.
+
+        For example a '104' mark describes RSP "1" for which an IPv4 iptables
+        rule exists. Mark 5010 describes RSP "50" for which both an IPv4 and
+        an IPv6 ip(6)tables rules exists.
+
         """
         logger.debug('Creating iptables rule for ACL "%s", ACE "%s", RSP "%s"',
                      self.rsp_acl, self.rsp_ace, self.rsp_id)
@@ -440,43 +575,40 @@ class NfqClassifier(metaclass=Singleton):
                           '-j', self.rsp_chain],
                          self.rsp_ipv)
 
-        # append [-A] a redirection of matched packets to the NetfilterQueue
+        # append [-A] packet marking and redirection to the NFQ
         run_iptables_cmd(['-A', self.rsp_chain,
-                          '-m', 'mark', '--mark', self.rsp_id,
-                          '-j', NFQ, '--queue-num', NFQ_NUMBER],
+                          '-m', 'mark', '--mark', self.rsp_mark,
+                          '-j', 'NFQUEUE', '--queue-num', NFQ_NUMBER],
                          self.rsp_ipv)
 
-    def create_rsp(self, rsp):
+    def create_rsp(self, rsp_name, sff_data):
         """
         Create iptables rules for the current RSP and add it to the data-store
 
-        :param rsp: RSPs' SFF description
-        :type rsp: dict
+        :param rsp_name: RSP name
+        :type rsp_name: str
+        :param sff_data: RSPs' SFF description
+        :type sff_data: dict
 
         """
         self.register_rsp()
 
         if self.rsp_id not in self.rsp_2_sff:
-            rsp.pop('path-id')
-            self.rsp_2_sff[self.rsp_id] = rsp
+            self.rsp_2_sff[self.rsp_id] = {'name': rsp_name}
+
+        if 'sff' not in self.rsp_2_sff[self.rsp_id]:
+            sff_data.pop('path-id')
+            self.rsp_2_sff[self.rsp_id]['sff'] = sff_data
 
         if 'chains' not in self.rsp_2_sff[self.rsp_id]:
             self.rsp_2_sff[self.rsp_id]['chains'] = {}
 
         self.rsp_2_sff[self.rsp_id]['chains'][self.rsp_chain] = self.rsp_ipv
 
-    def unregister_rsp(self, log=True):
+    def unregister_rsp(self):
         """
         Remove iptables rules for the current RSP
-
-        :param log: flag to log the message regarding removing RSP
-        :type log: bool
-
         """
-        if log:
-            logger.debug('Removing iptables rule for ACL "%s", ACE "%s", RSP '
-                         '"%s"', self.rsp_acl, self.rsp_ace, self.rsp_id)
-
         # delete [-D] the jump to the chain
         run_iptables_cmd(['-D', 'PREROUTING',
                           '-j', self.rsp_chain],
@@ -490,22 +622,72 @@ class NfqClassifier(metaclass=Singleton):
         run_iptables_cmd(['-X', self.rsp_chain],
                          self.rsp_ipv)
 
-    def remove_rsp(self):
+    def remove_rsp(self, rsp_name):
         """
-        Remove iptables rules for the current RSP and remove it from the
-        data-store
+        Remove ip(6)tables rules/chains for a given RSP and remove it from the
+        data-store as well; state (return) if the removal was succesfull.
+
+        :param rsp_name: RSP name
+        :type rsp_name: str
+
+        :return bool
+
+        """
+        try:
+            rsp_id, rsp_data = self._get_rsp_by_name(rsp_name)
+        except TypeError:
+            return False
+
+        logger.debug('Removing iptables rules for RSP "%s"', rsp_id)
+
+        for chain_name, ipv in rsp_data['chains'].items():
+            self.rsp_chain = chain_name
+            self.rsp_ipv = ipv
+
+            self.unregister_rsp()
+
+        del self.rsp_2_sff[rsp_id]
+        return True
+
+    def remove_acl_rsps(self):
+        """
+        Remove ip(6)tables rules/chains related to the current ACL
         """
-        self.unregister_rsp()
-        del self.rsp_2_sff[self.rsp_id]
+        rsps_to_remove = []
+
+        for rsp_id in self.rsp_2_sff:
+            rsp = self.rsp_2_sff[rsp_id]
+            rsp_chains = rsp['chains']
+
+            chains_to_remove = [chain_name for chain_name in rsp_chains.keys()
+                                if self.rsp_acl in chain_name]
+
+            if not chains_to_remove:
+                continue
+
+            logger.debug('Removing iptables rule for ACL "%s"', self.rsp_acl)
+
+            for chain_name in chains_to_remove:
+                self.rsp_chain = chain_name
+                self.rsp_ipv = rsp_chains.pop(chain_name)
+
+                self.unregister_rsp()
+
+            if not rsp_chains:
+                rsps_to_remove.append(rsp_id)
+
+        for rsp_id in rsps_to_remove:
+            del self.rsp_2_sff[rsp_id]
 
     def remove_all_rsps(self):
         """
-        Remove iptables rules for ALL registered RSPs and clear the data-store
+        Remove ip(6)tables rules for ALL registered RSPs and clear the
+        data-store
         """
         if not self.rsp_2_sff:
             return
 
-        logger.debug('Removing iptables rule(s) for ALL RSPs')
+        logger.debug('Removing created iptables rule(s) for ALL RSPs')
 
         for rsp_id in self.rsp_2_sff:
             rsp = self.rsp_2_sff[rsp_id]
@@ -514,42 +696,21 @@ class NfqClassifier(metaclass=Singleton):
                 self.rsp_ipv = ipv
                 self.rsp_chain = chain
 
-                self.unregister_rsp(log=False)
+                self.unregister_rsp()
 
         self.rsp_2_sff = {}
 
-    def packet_collector(self):
-        """
-        Main NFQ related method. Configure the queue and wait for packets.
-
-        NOTE: NetfilterQueue.run() blocs!
-
+    def nfq_running(self):
         """
-        try:
-            logger.info('Binding to NFQ queue number "%s"', NFQ_NUMBER)
-            self.nfq.bind(NFQ_NUMBER, self._process_packet)
-        except:
-            msg = ('Failed to bind to the NFQ queue number "%s". '
-                   'HINT: try to run command `sudo iptables -L` to check if '
-                   'the required queue is available.' % NFQ_NUMBER)
+        Check if the NFQ is running
 
-            logger.exception(msg)
-            raise
-
-        try:
-            logger.info('Starting NFQ - waiting for packets ...')
-            self.nfq.run()
-        except:
-            logger.exception('Failed to start NFQ')
-            raise
+        :return bool
 
-    def collect_packets(self):
-        """
-        Start a thread for NFQ packets collection
         """
-        nfq_thread = threading.Thread(target=self.packet_collector)
-        nfq_thread.daemon = True
-        nfq_thread.start()
+        if self.nfq is None:
+            return False
+        else:
+            return True
 
 
 def start_classifier():
@@ -567,5 +728,5 @@ def clear_classifier():
     """
     nfq_classifier = NfqClassifier()
 
-    nfq_classifier.remove_all_rsps()
-    nfq_classifier.nfq.unbind()
+    if nfq_classifier.nfq_running():
+        nfq_classifier.remove_all_rsps()