]> git.neil.brown.name Git - history.git/commitdiff
Integrate NAPI work done by Jamal Hadi Salim,
authorDavid S. Miller <davem@nuts.ninka.net>
Wed, 13 Mar 2002 09:04:16 +0000 (01:04 -0800)
committerDavid S. Miller <davem@nuts.ninka.net>
Wed, 13 Mar 2002 09:04:16 +0000 (01:04 -0800)
Robert Olsson, and Alexey Kuznetsov.  This changeset adds
the framework and implementation, but drivers need to be
ported to NAPI in order to take advantage of the new
facilities.  NAPI is fully backwards compatible, current
drivers will continue to work as they always have.

NAPI is a way for dealing with high packet load.  It allows
the driver to disable the RX interrupts on the card and enter
a polling mode.  Another way to describe NAPI would be as
implicit mitigation.  Once the device enters this polling
mode, it will exit back to interrupt based processing when
the receive packet queue is purged.

A full porting and description document is found at:
Documentation/networking/NAPI_HOWTO.txt
and this also makes reference to Usenix papers on the
web and other such resources available on NAPI.

NAPI has been found to not only increase packet processing
rates, it also gives greater fairness to the other interfaces
in the system which are not experiencing high packet load.

Documentation/networking/NAPI_HOWTO.txt [new file with mode: 0644]
include/linux/netdevice.h
include/linux/sysctl.h
net/core/dev.c
net/core/sysctl_net_core.c
net/netsyms.c

diff --git a/Documentation/networking/NAPI_HOWTO.txt b/Documentation/networking/NAPI_HOWTO.txt
new file mode 100644 (file)
index 0000000..44811d3
--- /dev/null
@@ -0,0 +1,749 @@
+HISTORY:
+February 16/2002 -- revision 0.2.1:
+COR typo corrected
+February 10/2002 -- revision 0.2:
+some spell checking ;->
+January 12/2002 -- revision 0.1
+This is still work in progress so may change.
+To keep up to date please watch this space.
+
+Introduction to NAPI
+====================
+
+NAPI is a proven (www.cyberus.ca/~hadi/usenix-paper.tgz) technique
+to improve network performance on Linux. For more details please
+read that paper.
+NAPI provides a "inherent mitigation" which is bound by system capacity
+as can be seen from the following data collected by Robert on Gigabit 
+ethernet (e1000):
+
+ Psize    Ipps       Tput     Rxint     Txint    Done     Ndone
+ ---------------------------------------------------------------
+   60    890000     409362        17     27622        7     6823
+  128    758150     464364        21      9301       10     7738
+  256    445632     774646        42     15507       21    12906
+  512    232666     994445    241292     19147   241192     1062
+ 1024    119061    1000003    872519     19258   872511        0
+ 1440     85193    1000003    946576     19505   946569        0
+
+Legend:
+"Ipps" stands for input packets per second. 
+"Tput" == packets out of total 1M that made it out.
+"txint" == transmit completion interrupts seen
+"Done" == The number of times that the poll() managed to pull all
+packets out of the rx ring. Note from this that the lower the
+load the more we could clean up the rxring
+"Ndone" == is the converse of "Done". Note again, that the higher
+the load the more times we couldnt clean up the rxring.
+
+Observe that:
+when the NIC receives 890Kpackets/sec only 17 rx interrupts are generated. 
+The system cant handle the processing at 1 interrupt/packet at that load level. 
+At lower rates on the other hand, rx interrupts go up and therefore the
+interrupt/packet ratio goes up (as observable from that table). So there is
+possibility that under low enough input, you get one poll call for each
+input packet caused by a single interrupt each time. And if the system 
+cant handle interrupt per packet ratio of 1, then it will just have to 
+chug along ....
+
+
+0) Prerequisites:
+==================
+A driver MAY continue using the old 2.4 technique for interfacing
+to the network stack and not benefit from the NAPI changes.
+NAPI additions to the kernel do not break backward compatibility.
+NAPI, however, requires the following features to be available:
+
+A) DMA ring or enough RAM to store packets in software devices.
+
+B) Ability to turn off interrupts or maybe events that send packets up 
+the stack.
+
+NAPI processes packet events in what is known as dev->poll() method.
+Typically, only packet receive events are processed in dev->poll(). 
+The rest of the events MAY be processed by the regular interrupt handler 
+to reduce processing latency (justified also because there are not that 
+many of them).
+Note, however, NAPI does not enforce that dev->poll() only processes 
+receive events. 
+Tests with the tulip driver indicated slightly increased latency if
+all of the interrupt handler is moved to dev->poll(). Also MII handling
+gets a little trickier.
+The example used in this document is to move the receive processing only
+to dev->poll(); this is shown with the patch for the tulip driver.
+For an example of code that moves all the interrupt driver to 
+dev->poll() look at the ported e1000 code.
+
+There are caveats that might force you to go with moving everything to 
+dev->poll(). Different NICs work differently depending on their status/event 
+acknowledgement setup. 
+There are two types of event register ACK mechanisms.
+       I)  what is known as Clear-on-read (COR).
+       when you read the status/event register, it clears everything!
+       The natsemi and sunbmac NICs are known to do this.
+       In this case your only choice is to move all to dev->poll()
+
+       II) Clear-on-write (COW)
+        i) you clear the status by writting a 1 in the bit-location you want.
+               These are the majority of the NICs and work the best with NAPI.
+               Put only receive events in dev->poll(); leave the rest in
+               the old interrupt handler.
+        ii) whatever you write in the status register clears every thing ;->
+               Cant seem to find any supported by Linux which do this. If
+               someone knows such a chip email us please.
+               Move all to dev->poll()
+
+C) Ability to detect new work correctly.
+NAPI works by shutting down event interrupts when theres work and
+turning them on when theres none. 
+New packets might show up in the small window while interrupts were being 
+re-enabled (refer to appendix 2).  A packet might sneak in during the period 
+we are enabling interrupts. We only get to know about such a packet when the 
+next new packet arrives and generates an interrupt. 
+Essentially, there is a small window of opportunity for a race condition
+which for clarity we'll refer to as the "rotting packet".
+
+This is a very important topic and appendix 2 is dedicated for more 
+discussion.
+
+Locking rules and environmental guarantees
+==========================================
+
+-Guarantee: Only one CPU at any time can call dev->poll(); this is because
+only one CPU can pick the initial interrupt and hence the initial
+netif_rx_schedule(dev);
+- The core layer invokes devices to send packets in a round robin format.
+This implies receive is totaly lockless because of the guarantee only that 
+one CPU is executing it.
+-  contention can only be the result of some other CPU accessing the rx
+ring. This happens only in close() and suspend() (when these methods
+try to clean the rx ring); 
+****guarantee: driver authors need not worry about this; synchronization 
+is taken care for them by the top net layer.
+-local interrupts are enabled (if you dont move all to dev->poll()). For 
+example link/MII and txcomplete continue functioning just same old way. 
+This improves the latency of processing these events. It is also assumed that 
+the receive interrupt is the largest cause of noise. Note this might not 
+always be true. 
+[according to Manfred Spraul, the winbond insists on sending one 
+txmitcomplete interrupt for each packet (although this can be mitigated)].
+For these broken drivers, move all to dev->poll().
+
+For the rest of this text, we'll assume that dev->poll() only
+processes receive events.
+
+new methods introduce by NAPI
+=============================
+
+a) netif_rx_schedule(dev)
+Called by an IRQ handler to schedule a poll for device
+
+b) netif_rx_schedule_prep(dev)
+puts the device in a state which allows for it to be added to the
+CPU polling list if it is up and running. You can look at this as
+the first half of  netif_rx_schedule(dev) above; the second half
+being c) below.
+
+c) __netif_rx_schedule(dev)
+Add device to the poll list for this CPU; assuming that _prep above
+has already been called and returned 1.
+
+d) netif_rx_reschedule(dev, undo)
+Called to reschedule polling for device specifically for some
+deficient hardware. Read Appendix 2 for more details.
+
+e) netif_rx_complete(dev)
+
+Remove interface from the CPU poll list: it must be in the poll list
+on current cpu. This primitive is called by dev->poll(), when
+it completes its work. The device cannot be out of poll list at this
+call, if it is then clearly it is a BUG(). You'll know ;->
+
+All these above nethods are used below. So keep reading for clarity.
+
+Device driver changes to be made when porting NAPI
+==================================================
+
+Below we describe what kind of changes are required for NAPI to work.
+
+1) introduction of dev->poll() method 
+=====================================
+
+This is the method that is invoked by the network core when it requests
+for new packets from the driver. A driver is allowed to send upto
+dev->quota packets by the current CPU before yielding to the network
+subsystem (so other devices can also get opportunity to send to the stack).
+
+dev->poll() prototype looks as follows:
+int my_poll(struct net_device *dev, int *budget)
+
+budget is the remaining number of packets the network subsystem on the
+current CPU can send up the stack before yielding to other system tasks.
+*Each driver is responsible for decrementing budget by the total number of
+packets sent.
+       Total number of packets cannot exceed dev->quota.
+
+dev->poll() method is invoked by the top layer, the driver just sends if it 
+can to the stack the packet quantity requested.
+
+more on dev->poll() below after the interrupt changes are explained.
+
+2) registering dev->poll() method
+===================================
+
+dev->poll should be set in the dev->probe() method. 
+e.g:
+dev->open = my_open;
+.
+.
+/* two new additions */
+/* first register my poll method */
+dev->poll = my_poll;
+/* next register my weight/quanta; can be overriden in /proc */
+dev->weight = 16;
+.
+.
+dev->stop = my_close;
+
+
+
+3) scheduling dev->poll()
+=============================
+This involves modifying the interrupt handler and the code
+path which takes the packet off the NIC and sends them to the 
+stack.
+
+it's important at this point to introduce the classical D Becker 
+interrupt processor:
+
+------------------
+static void
+netdevice_interrupt(int irq, void *dev_id, struct pt_regs *regs)
+{
+
+       struct net_device *dev = (struct net_device *)dev_instance;
+       struct my_private *tp = (struct my_private *)dev->priv;
+
+       int work_count = my_work_count;
+        status = read_interrupt_status_reg();
+        if (status == 0)
+                return;         /* Shared IRQ: not us */
+        if (status == 0xffff)
+                return;         /* Hot unplug */
+        if (status & error)
+               do_some_error_handling()
+        
+       do {
+               acknowledge_ints_ASAP();
+
+               if (status & link_interrupt) {
+                       spin_lock(&tp->link_lock);
+                       do_some_link_stat_stuff();
+                       spin_lock(&tp->link_lock);
+               }
+               
+               if (status & rx_interrupt) {
+                       receive_packets(dev);
+               }
+
+               if (status & rx_nobufs) {
+                       make_rx_buffs_avail();
+               }
+                       
+               if (status & tx_related) {
+                       spin_lock(&tp->lock);
+                       tx_ring_free(dev);
+                       if (tx_died)
+                               restart_tx();
+                       spin_unlock(&tp->lock);
+               }
+
+               status = read_interrupt_status_reg();
+
+       } while (!(status & error) || more_work_to_be_done);
+
+}
+
+----------------------------------------------------------------------
+
+We now change this to what is shown below to NAPI-enable it:
+
+----------------------------------------------------------------------
+static void
+netdevice_interrupt(int irq, void *dev_id, struct pt_regs *regs)
+{
+       struct net_device *dev = (struct net_device *)dev_instance;
+       struct my_private *tp = (struct my_private *)dev->priv;
+
+        status = read_interrupt_status_reg();
+        if (status == 0)
+                return;         /* Shared IRQ: not us */
+        if (status == 0xffff)
+                return;         /* Hot unplug */
+        if (status & error)
+               do_some_error_handling();
+        
+       do {
+/************************ start note *********************************/                
+               acknowledge_ints_ASAP();  // dont ack rx and rxnobuff here
+/************************ end note *********************************/          
+
+               if (status & link_interrupt) {
+                       spin_lock(&tp->link_lock);
+                       do_some_link_stat_stuff();
+                       spin_unlock(&tp->link_lock);
+               }
+/************************ start note *********************************/                
+               if (status & rx_interrupt || (status & rx_nobuffs)) {
+                       if (netif_rx_schedule_prep(dev)) {
+
+                               /* disable interrupts caused 
+                                *      by arriving packets */
+                               disable_rx_and_rxnobuff_ints();
+                               /* tell system we have work to be done. */
+                               __netif_rx_schedule(dev);
+                       } else {
+                               printk("driver bug! interrupt while in poll\n");
+                               /* FIX by disabling interrupts  */
+                               disable_rx_and_rxnobuff_ints();
+                       }
+               }
+/************************ end note note *********************************/             
+                       
+               if (status & tx_related) {
+                       spin_lock(&tp->lock);
+                       tx_ring_free(dev);
+
+                       if (tx_died)
+                               restart_tx();
+                       spin_unlock(&tp->lock);
+               }
+
+               status = read_interrupt_status_reg();
+
+/************************ start note *********************************/                
+       } while (!(status & error) || more_work_to_be_done(status));
+/************************ end note note *********************************/             
+
+}
+
+---------------------------------------------------------------------
+
+
+We note several things from above:
+
+I) Any interrupt source which is caused by arriving packets is now
+turned off when it occurs. Depending on the hardware, there could be
+several reasons that arriving packets would cause interrupts; these are the
+interrupt sources we wish to avoid. The two common ones are a) a packet 
+arriving (rxint) b) a packet arriving and finding no DMA buffers available
+(rxnobuff) .
+This means also acknowledge_ints_ASAP() will not clear the status
+register for those two items above; clearing is done in the place where 
+proper work is done within NAPI; at the poll() and refill_rx_ring() 
+discussed further below.
+netif_rx_schedule_prep() returns 1 if device is in running state and
+gets successfully added to the core poll list. If we get a zero value
+we can _almost_ assume are already added to the list (instead of not running. 
+Logic based on the fact that you shouldnt get interrupt if not running)
+We rectify this by disabling rx and rxnobuf interrupts.
+
+II) that receive_packets(dev) and make_rx_buffs_avail() may have dissapeared.
+These functionalities are still around actually......
+
+infact, receive_packets(dev) is very close to my_poll() and 
+make_rx_buffs_avail() is invoked from my_poll()
+
+4) converting receive_packets() to dev->poll()
+===============================================
+
+We need to convert the classical D Becker receive_packets(dev) to my_poll()
+
+First the typical receive_packets() below:
+-------------------------------------------------------------------
+
+/* this is called by interrupt handler */
+static void receive_packets (struct net_device *dev)
+{
+
+       struct my_private *tp = (struct my_private *)dev->priv;
+       rx_ring = tp->rx_ring;
+       cur_rx = tp->cur_rx;
+       int entry = cur_rx % RX_RING_SIZE;
+       int received = 0;
+       int rx_work_limit = tp->dirty_rx + RX_RING_SIZE - tp->cur_rx;
+
+       while (rx_ring_not_empty) {
+               u32 rx_status;
+               unsigned int rx_size;
+               unsigned int pkt_size;
+               struct sk_buff *skb;
+                /* read size+status of next frame from DMA ring buffer */
+               /* the number 16 and 4 are just examples */
+                rx_status = le32_to_cpu (*(u32 *) (rx_ring + ring_offset));
+                rx_size = rx_status >> 16;
+                pkt_size = rx_size - 4;
+
+               /* process errors */
+                if ((rx_size > (MAX_ETH_FRAME_SIZE+4)) ||
+                    (!(rx_status & RxStatusOK))) {
+                        netdrv_rx_err (rx_status, dev, tp, ioaddr);
+                        return;
+                }
+
+                if (--rx_work_limit < 0)
+                        break;
+
+               /* grab a skb */
+                skb = dev_alloc_skb (pkt_size + 2);
+                if (skb) {
+                       .
+                       .
+                       netif_rx (skb);
+                       .
+                       .
+                } else {  /* OOM */
+                       /*seems very driver specific ... some just pass
+                       whatever is on the ring already. */
+                }
+
+               /* move to the next skb on the ring */
+               entry = (++tp->cur_rx) % RX_RING_SIZE;
+               received++ ;
+
+        }
+
+       /* store current ring pointer state */
+        tp->cur_rx = cur_rx;
+
+        /* Refill the Rx ring buffers if they are needed */
+       refill_rx_ring();
+       .
+       .
+
+}
+-------------------------------------------------------------------
+We change it to a new one below; note the additional parameter in
+the call.
+
+-------------------------------------------------------------------
+
+/* this is called by the network core */
+static void my_poll (struct net_device *dev, int *budget)
+{
+
+       struct my_private *tp = (struct my_private *)dev->priv;
+       rx_ring = tp->rx_ring;
+       cur_rx = tp->cur_rx;
+       int entry = cur_rx % RX_BUF_LEN;
+       /* maximum packets to send to the stack */
+/************************ note note *********************************/         
+       int rx_work_limit = dev->quota;
+
+/************************ end note note *********************************/             
+    do {  // outer beggining loop starts here
+
+       clear_rx_status_register_bit();
+
+       while (rx_ring_not_empty) {
+               u32 rx_status;
+               unsigned int rx_size;
+               unsigned int pkt_size;
+               struct sk_buff *skb;
+                /* read size+status of next frame from DMA ring buffer */
+               /* the number 16 and 4 are just examples */
+                rx_status = le32_to_cpu (*(u32 *) (rx_ring + ring_offset));
+                rx_size = rx_status >> 16;
+                pkt_size = rx_size - 4;
+
+               /* process errors */
+                if ((rx_size > (MAX_ETH_FRAME_SIZE+4)) ||
+                    (!(rx_status & RxStatusOK))) {
+                        netdrv_rx_err (rx_status, dev, tp, ioaddr);
+                        return;
+                }
+
+/************************ note note *********************************/         
+                if (--rx_work_limit < 0) { /* we got packets, but no quota */
+                       /* store current ring pointer state */
+                       tp->cur_rx = cur_rx;
+
+                       /* Refill the Rx ring buffers if they are needed */
+                       refill_rx_ring(dev);
+                        goto not_done;
+               }
+/**********************  end note **********************************/
+
+               /* grab a skb */
+                skb = dev_alloc_skb (pkt_size + 2);
+                if (skb) {
+                       .
+                       .
+/************************ note note *********************************/         
+                       netif_receive_skb (skb);
+/**********************  end note **********************************/
+                       .
+                       .
+                } else {  /* OOM */
+                       /*seems very driver specific ... common is just pass
+                       whatever is on the ring already. */
+                }
+
+               /* move to the next skb on the ring */
+               entry = (++tp->cur_rx) % RX_RING_SIZE;
+               received++ ;
+
+        }
+
+       /* store current ring pointer state */
+        tp->cur_rx = cur_rx;
+
+        /* Refill the Rx ring buffers if they are needed */
+       refill_rx_ring(dev);
+       
+       /* no packets on ring; but new ones can arrive since we last 
+          checked  */
+       status = read_interrupt_status_reg();
+       if (rx status is not set) {
+                        /* If something arrives in this narrow window,
+                       an interrupt will be generated */
+                        goto done;
+       }
+       /* done! at least thats what it looks like ;->
+       if new packets came in after our last check on status bits
+       they'll be caught by the while check and we go back and clear them 
+       since we havent exceeded our quota */
+    } while (rx_status_is_set); 
+
+done:
+
+/************************ note note *********************************/         
+        dev->quota -= received;
+        *budget -= received;
+
+        /* If RX ring is not full we are out of memory. */
+        if (tp->rx_buffers[tp->dirty_rx % RX_RING_SIZE].skb == NULL)
+                goto oom;
+
+       /* we are happy/done, no more packets on ring; put us back
+       to where we can start processing interrupts again */
+        netif_rx_complete(dev);
+       enable_rx_and_rxnobuf_ints();
+
+       /* The last op happens after poll completion. Which means the following:
+        * 1. it can race with disabling irqs in irq handler (which are done to 
+       * schedule polls)
+        * 2. it can race with dis/enabling irqs in other poll threads
+        * 3. if an irq raised after the begining of the outer  beginning 
+        * loop(marked in the code above), it will be immediately
+        * triggered here.
+        *
+        * Summarizing: the logic may results in some redundant irqs both
+        * due to races in masking and due to too late acking of already
+        * processed irqs. The good news: no events are ever lost.
+        */
+
+        return 0;   /* done */
+
+not_done:
+        if (tp->cur_rx - tp->dirty_rx > RX_RING_SIZE/2 ||
+            tp->rx_buffers[tp->dirty_rx % RX_RING_SIZE].skb == NULL)
+                refill_rx_ring(dev);
+
+        if (!received) {
+                printk("received==0\n");
+                received = 1;
+        }
+        dev->quota -= received;
+        *budget -= received;
+        return 1;  /* not_done */
+
+oom:
+        /* Start timer, stop polling, but do not enable rx interrupts. */
+       start_poll_timer(dev);
+        return 0;  /* we'll take it from here so tell core "done"*/
+
+/************************ End note note *********************************/             
+}
+-------------------------------------------------------------------
+
+From above we note that:
+0) rx_work_limit = dev->quota 
+1) refill_rx_ring() is in charge of clearing the bit for rxnobuff when
+it does the work.
+2) We have a done and not_done state.
+3) instead of netif_rx() we call netif_receive_skb() to pass the skb.
+4) we have a new way of handling oom condition
+5) A new outer for (;;) loop has been added. This serves the purpose of
+ensuring that if a new packet has come in, after we are all set and done,
+and we have not exceeded our quota that we continue sending packets up.
+
+-----------------------------------------------------------
+Poll timer code will need to do the following:
+
+a) 
+
+        if (tp->cur_rx - tp->dirty_rx > RX_RING_SIZE/2 ||
+            tp->rx_buffers[tp->dirty_rx % RX_RING_SIZE].skb == NULL) 
+                refill_rx_ring(dev);
+
+        /* If RX ring is not full we are still out of memory.
+          Restart the timer again. Else we re-add ourselves 
+           to the master poll list.
+         */
+
+        if (tp->rx_buffers[tp->dirty_rx % RX_RING_SIZE].skb == NULL)
+                restart_timer();
+
+       else netif_rx_schedule(dev);  /* we are back on the poll list */
+       
+5) dev->close() and dev->suspend() issues
+==========================================
+The driver writter neednt worry about this. The top net layer takes
+care of it.
+
+6) Adding new Stats to /proc 
+=============================
+In order to debug some of the new features, we introduce new stats
+that need to be collected.
+TODO: Fill this later.
+
+APPENDIX 1: discussion on using ethernet HW FC
+==============================================
+Most chips with FC only send a pause packet when they run out of Rx buffers.
+Since packets are pulled off the DMA ring by a softirq in NAPI,
+if the system is slow in grabbing them and we have a high input
+rate (faster than the system's capacity to remove packets), then theoretically
+there will only be one rx interrupt for all packets during a given packetstorm.
+Under low load, we might have a single interrupt per packet.
+FC should be programmed to apply in the case when the system cant pull out
+packets fast enough i.e send a pause only when you run out of rx buffers.
+Note FC in itself is a good solution but we have found it to not be
+much of a commodity feature (both in NICs and switches) and hence falls
+under the same category as using NIC based mitigation. Also experiments
+indicate that its much harder to resolve the resource allocation
+issue (aka lazy receiving that NAPI offers) and hence quantify its usefullness
+proved harder. In any case, FC works even better with NAPI but is not
+necessary.
+
+
+APPENDIX 2: the "rotting packet" race-window avoidance scheme 
+=============================================================
+
+There are two types of associations seen here
+
+1) status/int which honors level triggered IRQ
+
+If a status bit for receive or rxnobuff is set and the corresponding 
+interrupt-enable bit is not on, then no interrupts will be generated. However, 
+as soon as the "interrupt-enable" bit is unmasked, an immediate interrupt is 
+generated.  [assuming the status bit was not turned off].
+Generally the concept of level triggered IRQs in association with a status and
+interrupt-enable CSR register set is used to avoid the race.
+
+If we take the example of the tulip:
+"pending work" is indicated by the status bit(CSR5 in tulip).
+the corresponding interrupt bit (CSR7 in tulip) might be turned off (but
+the CSR5 will continue to be turned on with new packet arrivals even if
+we clear it the first time)
+Very important is the fact that if we turn on the interrupt bit on when
+status is set that an immediate irq is triggered.
+If we cleared the rx ring and proclaimed there was "no more work
+to be done" and then went on to do a few other things;  then when we enable
+interrupts, there is a possibility that a new packet might sneak in during
+this phase. It helps to look at the pseudo code for the tulip poll
+routine:
+
+--------------------------
+        do {
+                ACK;
+                while (ring_is_not_empty()) {
+                        work-work-work
+                        if quota is exceeded: exit, no touching irq status/mask
+                }
+                /* No packets, but new can arrive while we are doing this*/
+                CSR5 := read
+                if (CSR5 is not set) {
+                        /* If something arrives in this narrow window here,
+                        *  where the comments are ;-> irq will be generated */
+                        unmask irqs;
+                        exit poll;
+                }
+        } while (rx_status_is_set);
+------------------------
+
+CSR5 bit of interest is only the rx status. 
+If you look at the last if statement: 
+you just finished grabbing all the packets from the rx ring .. you check if
+status bit says theres more packets just in ... it says none; you then
+enable rx interrupts again; if a new packet just came in during this check,
+we are counting that CSR5 will be set in that small window of opportunity
+and that by re-enabling interrupts, we would actually triger an interrupt
+to register the new packet for processing.
+
+[The above description nay be very verbose, if you have better wording 
+that will make this more understandable, please suggest it.]
+
+2) non-capable hardware
+
+These do not generally respect level triggered IRQs. Normally,
+irqs may be lost while being masked and the only way to leave poll is to do
+a double check for new input after netif_rx_complete() is invoked
+and re-enable polling (after seeing this new input).
+
+Sample code:
+
+---------
+       .
+       .
+restart_poll:
+       while (ring_is_not_empty()) {
+               work-work-work
+               if quota is exceeded: exit, not touching irq status/mask
+       }
+       .
+       .
+       .
+       enable_rx_interrupts()
+       netif_rx_complete(dev);
+       if (ring_has_new_packet() && netif_rx_reschedule(dev, received)) {
+               disable_rx_and_rxnobufs()
+               goto restart_poll
+       } while (rx_status_is_set);
+---------
+               
+Basically netif_rx_complete() removes us from the poll list, but because a
+new packet which will never be caught due to the possibility of a race
+might come in, we attempt to re-add ourselves to the poll list. 
+
+
+
+--------------------------------------------------------------------
+
+relevant sites:
+==================
+ftp://robur.slu.se/pub/Linux/net-development/NAPI/
+
+
+--------------------------------------------------------------------
+TODO: Write net-skeleton.c driver.
+-------------------------------------------------------------
+
+Authors:
+========
+Alexey Kuznetsov <kuznet@ms2.inr.ac.ru>
+Jamal Hadi Salim <hadi@cyberus.ca>
+Robert Olsson <Robert.Olsson@data.slu.se>
+
+Acknowledgements:
+================
+People who made this document better:
+
+Lennert Buytenhek <buytenh@gnu.org>
+Andrew Morton  <akpm@zip.com.au>
+Manfred Spraul <manfred@colorfullife.com>
+Donald Becker <becker@scyld.com>
+Jeff Garzik <jgarzik@mandrakesoft.com>
index 178f6a5a0fe6d28260e80a8194b960daa5a0c811..32b6db3c7a2c402f6834ca0e52374eb4a2a3f927 100644 (file)
@@ -206,7 +206,8 @@ enum netdev_state_t
        __LINK_STATE_START,
        __LINK_STATE_PRESENT,
        __LINK_STATE_SCHED,
-       __LINK_STATE_NOCARRIER
+       __LINK_STATE_NOCARRIER,
+       __LINK_STATE_RX_SCHED
 };
 
 
@@ -330,6 +331,10 @@ struct net_device
        void                    *ip6_ptr;       /* IPv6 specific data */
        void                    *ec_ptr;        /* Econet specific data */
 
+       struct list_head        poll_list;      /* Link to poll list    */
+       int                     quota;
+       int                     weight;
+
        struct Qdisc            *qdisc;
        struct Qdisc            *qdisc_sleeping;
        struct Qdisc            *qdisc_list;
@@ -373,6 +378,7 @@ struct net_device
        int                     (*stop)(struct net_device *dev);
        int                     (*hard_start_xmit) (struct sk_buff *skb,
                                                    struct net_device *dev);
+       int                     (*poll) (struct net_device *dev, int *quota);
        int                     (*hard_header) (struct sk_buff *skb,
                                                struct net_device *dev,
                                                unsigned short type,
@@ -492,8 +498,11 @@ struct softnet_data
        int                     cng_level;
        int                     avg_blog;
        struct sk_buff_head     input_pkt_queue;
+       struct list_head        poll_list;
        struct net_device       *output_queue;
        struct sk_buff          *completion_queue;
+
+       struct net_device       backlog_dev;    /* Sorry. 8) */
 } __attribute__((__aligned__(SMP_CACHE_BYTES)));
 
 
@@ -547,6 +556,7 @@ static inline int netif_running(struct net_device *dev)
        return test_bit(__LINK_STATE_START, &dev->state);
 }
 
+
 /* Use this variant when it is known for sure that it
  * is executing from interrupt context.
  */
@@ -578,6 +588,8 @@ static inline void dev_kfree_skb_any(struct sk_buff *skb)
 extern void            net_call_rx_atomic(void (*fn)(void));
 #define HAVE_NETIF_RX 1
 extern int             netif_rx(struct sk_buff *skb);
+#define HAVE_NETIF_RECEIVE_SKB 1
+extern int             netif_receive_skb(struct sk_buff *skb);
 extern int             dev_ioctl(unsigned int cmd, void *);
 extern int             dev_change_flags(struct net_device *, unsigned);
 extern void            dev_queue_xmit_nit(struct sk_buff *skb, struct net_device *dev);
@@ -695,6 +707,78 @@ enum {
 #define netif_msg_rx_status(p) ((p)->msg_enable & NETIF_MSG_RX_STATUS)
 #define netif_msg_pktdata(p)   ((p)->msg_enable & NETIF_MSG_PKTDATA)
 
+/* Schedule rx intr now? */
+
+static inline int netif_rx_schedule_prep(struct net_device *dev)
+{
+       return netif_running(dev) &&
+               !test_and_set_bit(__LINK_STATE_RX_SCHED, &dev->state);
+}
+
+/* Add interface to tail of rx poll list. This assumes that _prep has
+ * already been called and returned 1.
+ */
+
+static inline void __netif_rx_schedule(struct net_device *dev)
+{
+       unsigned long flags;
+       int cpu = smp_processor_id();
+
+       local_irq_save(flags);
+       dev_hold(dev);
+       list_add_tail(&dev->poll_list, &softnet_data[cpu].poll_list);
+       if (dev->quota < 0)
+               dev->quota += dev->weight;
+       else
+               dev->quota = dev->weight;
+       __cpu_raise_softirq(cpu, NET_RX_SOFTIRQ);
+       local_irq_restore(flags);
+}
+
+/* Try to reschedule poll. Called by irq handler. */
+
+static inline void netif_rx_schedule(struct net_device *dev)
+{
+       if (netif_rx_schedule_prep(dev))
+               __netif_rx_schedule(dev);
+}
+
+/* Try to reschedule poll. Called by dev->poll() after netif_rx_complete().
+ * Do not inline this?
+ */
+static inline int netif_rx_reschedule(struct net_device *dev, int undo)
+{
+       if (netif_rx_schedule_prep(dev)) {
+               unsigned long flags;
+               int cpu = smp_processor_id();
+
+               dev->quota += undo;
+
+               local_irq_save(flags);
+               list_add_tail(&dev->poll_list, &softnet_data[cpu].poll_list);
+               __cpu_raise_softirq(cpu, NET_RX_SOFTIRQ);
+               local_irq_restore(flags);
+               return 1;
+       }
+       return 0;
+}
+
+/* Remove interface from poll list: it must be in the poll list
+ * on current cpu. This primitive is called by dev->poll(), when
+ * it completes the work. The device cannot be out of poll list at this
+ * moment, it is BUG().
+ */
+static inline void netif_rx_complete(struct net_device *dev)
+{
+       unsigned long flags;
+
+       local_irq_save(flags);
+       if (!test_bit(__LINK_STATE_RX_SCHED, &dev->state)) BUG();
+       list_del(&dev->poll_list);
+       clear_bit(__LINK_STATE_RX_SCHED, &dev->state);
+       local_irq_restore(flags);
+}
+
 /* These functions live elsewhere (drivers/net/net_init.c, but related) */
 
 extern void            ether_setup(struct net_device *dev);
@@ -719,6 +803,7 @@ extern void         dev_mcast_init(void);
 extern int             netdev_register_fc(struct net_device *dev, void (*stimul)(struct net_device *dev));
 extern void            netdev_unregister_fc(int bit);
 extern int             netdev_max_backlog;
+extern int             weight_p;
 extern unsigned long   netdev_fc_xoff;
 extern atomic_t netdev_dropping;
 extern int             netdev_set_master(struct net_device *dev, struct net_device *master);
index 938560387354b58521888e74b8eef0270ee7ae64..01829afb8e41bf9bc32495eaf8a12f225753a0ff 100644 (file)
@@ -202,7 +202,8 @@ enum
        NET_CORE_NO_CONG_THRESH=13,
        NET_CORE_NO_CONG=14,
        NET_CORE_LO_CONG=15,
-       NET_CORE_MOD_CONG=16
+       NET_CORE_MOD_CONG=16,
+       NET_CORE_DEV_WEIGHT=17
 };
 
 /* /proc/sys/net/ethernet */
index 6a510b1a8ea41c972426183d89794b3b1c818adf..8c340f76aa568f3500c333781866767fcbc9b628 100644 (file)
@@ -798,6 +798,19 @@ int dev_close(struct net_device *dev)
 
        clear_bit(__LINK_STATE_START, &dev->state);
 
+       /* Synchronize to scheduled poll. We cannot touch poll list,
+        * it can be even on different cpu. So just clear netif_running(),
+        * and wait when poll really will happen. Actually, the best place
+        * for this is inside dev->stop() after device stopped its irq
+        * engine, but this requires more changes in devices. */
+
+       smp_mb__after_clear_bit(); /* Commit netif_running(). */
+       while (test_bit(__LINK_STATE_RX_SCHED, &dev->state)) {
+               /* No hurry. */
+               current->state = TASK_INTERRUPTIBLE;
+               schedule_timeout(1);
+       }
+
        /*
         *      Call the device specific close. This cannot fail.
         *      Only if device is UP
@@ -1072,6 +1085,7 @@ int dev_queue_xmit(struct sk_buff *skb)
   =======================================================================*/
 
 int netdev_max_backlog = 300;
+int weight_p = 64;            /* old backlog weight */
 /* These numbers are selected based on intuition and some
  * experimentatiom, if you have more scientific way of doing this
  * please go ahead and fix things.
@@ -1237,13 +1251,11 @@ int netif_rx(struct sk_buff *skb)
 enqueue:
                        dev_hold(skb->dev);
                        __skb_queue_tail(&queue->input_pkt_queue,skb);
-                       /* Runs from irqs or BH's, no need to wake BH */
-                       cpu_raise_softirq(this_cpu, NET_RX_SOFTIRQ);
                        local_irq_restore(flags);
 #ifndef OFFLINE_SAMPLE
                        get_sample_stats(this_cpu);
 #endif
-                       return softnet_data[this_cpu].cng_level;
+                       return queue->cng_level;
                }
 
                if (queue->throttle) {
@@ -1253,6 +1265,8 @@ enqueue:
                                netdev_wakeup();
 #endif
                }
+
+               netif_rx_schedule(&queue->backlog_dev);
                goto enqueue;
        }
 
@@ -1308,19 +1322,12 @@ static int deliver_to_old_ones(struct packet_type *pt, struct sk_buff *skb, int
        return ret;
 }
 
-/* Reparent skb to master device. This function is called
- * only from net_rx_action under BR_NETPROTO_LOCK. It is misuse
- * of BR_NETPROTO_LOCK, but it is OK for now.
- */
 static __inline__ void skb_bond(struct sk_buff *skb)
 {
        struct net_device *dev = skb->dev;
-       
-       if (dev->master) {
-               dev_hold(dev->master);
+
+       if (dev->master)
                skb->dev = dev->master;
-               dev_put(dev);
-       }
 }
 
 static void net_tx_action(struct softirq_action *h)
@@ -1416,121 +1423,138 @@ static inline void handle_diverter(struct sk_buff *skb)
 }
 #endif   /* CONFIG_NET_DIVERT */
 
-
-static void net_rx_action(struct softirq_action *h)
+int netif_receive_skb(struct sk_buff *skb)
 {
-       int this_cpu = smp_processor_id();
-       struct softnet_data *queue = &softnet_data[this_cpu];
-       unsigned long start_time = jiffies;
-       int bugdet = netdev_max_backlog;
-
-       br_read_lock(BR_NETPROTO_LOCK);
-
-       for (;;) {
-               struct sk_buff *skb;
-               struct net_device *rx_dev;
-
-               local_irq_disable();
-               skb = __skb_dequeue(&queue->input_pkt_queue);
-               local_irq_enable();
+       struct packet_type *ptype, *pt_prev;
+       int ret = NET_RX_DROP;
+       unsigned short type = skb->protocol;
 
-               if (skb == NULL)
-                       break;
+       if (skb->stamp.tv_sec == 0)
+               do_gettimeofday(&skb->stamp);
 
-               skb_bond(skb);
+       skb_bond(skb);
 
-               rx_dev = skb->dev;
+       netdev_rx_stat[smp_processor_id()].total++;
 
 #ifdef CONFIG_NET_FASTROUTE
-               if (skb->pkt_type == PACKET_FASTROUTE) {
-                       netdev_rx_stat[this_cpu].fastroute_deferred_out++;
-                       dev_queue_xmit(skb);
-                       dev_put(rx_dev);
-                       continue;
-               }
+       if (skb->pkt_type == PACKET_FASTROUTE) {
+               netdev_rx_stat[smp_processor_id()].fastroute_deferred_out++;
+               return dev_queue_xmit(skb);
+       }
 #endif
-               skb->h.raw = skb->nh.raw = skb->data;
-               {
-                       struct packet_type *ptype, *pt_prev;
-                       unsigned short type = skb->protocol;
-
-                       pt_prev = NULL;
-                       for (ptype = ptype_all; ptype; ptype = ptype->next) {
-                               if (!ptype->dev || ptype->dev == skb->dev) {
-                                       if (pt_prev) {
-                                               if (!pt_prev->data) {
-                                                       deliver_to_old_ones(pt_prev, skb, 0);
-                                               } else {
-                                                       atomic_inc(&skb->users);
-                                                       pt_prev->func(skb,
-                                                                     skb->dev,
-                                                                     pt_prev);
-                                               }
-                                       }
-                                       pt_prev = ptype;
+
+       skb->h.raw = skb->nh.raw = skb->data;
+
+       pt_prev = NULL;
+       for (ptype = ptype_all; ptype; ptype = ptype->next) {
+               if (!ptype->dev || ptype->dev == skb->dev) {
+                       if (pt_prev) {
+                               if (!pt_prev->data) {
+                                       ret = deliver_to_old_ones(pt_prev, skb, 0);
+                               } else {
+                                       atomic_inc(&skb->users);
+                                       ret = pt_prev->func(skb, skb->dev, pt_prev);
                                }
                        }
+                       pt_prev = ptype;
+               }
+       }
 
 #ifdef CONFIG_NET_DIVERT
-                       if (skb->dev->divert && skb->dev->divert->divert)
-                               handle_diverter(skb);
+       if (skb->dev->divert && skb->dev->divert->divert)
+               ret = handle_diverter(skb);
 #endif /* CONFIG_NET_DIVERT */
-
                        
 #if defined(CONFIG_BRIDGE) || defined(CONFIG_BRIDGE_MODULE)
-                       if (skb->dev->br_port != NULL &&
-                           br_handle_frame_hook != NULL) {
-                               handle_bridge(skb, pt_prev);
-                               dev_put(rx_dev);
-                               continue;
-                       }
+       if (skb->dev->br_port != NULL &&
+           br_handle_frame_hook != NULL) {
+               return handle_bridge(skb, pt_prev);
+       }
 #endif
 
-                       for (ptype=ptype_base[ntohs(type)&15];ptype;ptype=ptype->next) {
-                               if (ptype->type == type &&
-                                   (!ptype->dev || ptype->dev == skb->dev)) {
-                                       if (pt_prev) {
-                                               if (!pt_prev->data)
-                                                       deliver_to_old_ones(pt_prev, skb, 0);
-                                               else {
-                                                       atomic_inc(&skb->users);
-                                                       pt_prev->func(skb,
-                                                                     skb->dev,
-                                                                     pt_prev);
-                                               }
-                                       }
-                                       pt_prev = ptype;
+       for (ptype=ptype_base[ntohs(type)&15];ptype;ptype=ptype->next) {
+               if (ptype->type == type &&
+                   (!ptype->dev || ptype->dev == skb->dev)) {
+                       if (pt_prev) {
+                               if (!pt_prev->data) {
+                                       ret = deliver_to_old_ones(pt_prev, skb, 0);
+                               } else {
+                                       atomic_inc(&skb->users);
+                                       ret = pt_prev->func(skb, skb->dev, pt_prev);
                                }
                        }
+                       pt_prev = ptype;
+               }
+       }
 
-                       if (pt_prev) {
-                               if (!pt_prev->data)
-                                       deliver_to_old_ones(pt_prev, skb, 1);
-                               else
-                                       pt_prev->func(skb, skb->dev, pt_prev);
-                       } else
-                               kfree_skb(skb);
+       if (pt_prev) {
+               if (!pt_prev->data) {
+                       ret = deliver_to_old_ones(pt_prev, skb, 1);
+               } else {
+                       ret = pt_prev->func(skb, skb->dev, pt_prev);
                }
+       } else {
+               kfree_skb(skb);
+               /* Jamal, now you will not able to escape explaining
+                * me how you were going to use this. :-)
+                */
+               ret = NET_RX_DROP;
+       }
 
-               dev_put(rx_dev);
+       return ret;
+}
 
-               if (bugdet-- < 0 || jiffies - start_time > 1)
-                       goto softnet_break;
+static int process_backlog(struct net_device *backlog_dev, int *budget)
+{
+       int work = 0;
+       int quota = min(backlog_dev->quota, *budget);
+       int this_cpu = smp_processor_id();
+       struct softnet_data *queue = &softnet_data[this_cpu];
+       unsigned long start_time = jiffies;
+
+       for (;;) {
+               struct sk_buff *skb;
+               struct net_device *dev;
+
+               local_irq_disable();
+               skb = __skb_dequeue(&queue->input_pkt_queue);
+               if (skb == NULL)
+                       goto job_done;
+               local_irq_enable();
+
+               dev = skb->dev;
+
+               netif_receive_skb(skb);
+
+               dev_put(dev);
+
+               work++;
+
+               if (work >= quota || jiffies - start_time > 1)
+                       break;
 
 #ifdef CONFIG_NET_HW_FLOWCONTROL
-       if (queue->throttle && queue->input_pkt_queue.qlen < no_cong_thresh ) {
-               if (atomic_dec_and_test(&netdev_dropping)) {
-                       queue->throttle = 0;
-                       netdev_wakeup();
-                       goto softnet_break;
+               if (queue->throttle && queue->input_pkt_queue.qlen < no_cong_thresh ) {
+                       if (atomic_dec_and_test(&netdev_dropping)) {
+                               queue->throttle = 0;
+                               netdev_wakeup();
+                               break;
+                       }
                }
-       }
 #endif
-
        }
-       br_read_unlock(BR_NETPROTO_LOCK);
 
-       local_irq_disable();
+       backlog_dev->quota -= work;
+       *budget -= work;
+       return -1;
+
+job_done:
+       backlog_dev->quota -= work;
+       *budget -= work;
+
+       list_del(&backlog_dev->poll_list);
+       clear_bit(__LINK_STATE_RX_SCHED, &backlog_dev->state);
+
        if (queue->throttle) {
                queue->throttle = 0;
 #ifdef CONFIG_NET_HW_FLOWCONTROL
@@ -1539,21 +1563,53 @@ static void net_rx_action(struct softirq_action *h)
 #endif
        }
        local_irq_enable();
+       return 0;
+}
 
-       NET_PROFILE_LEAVE(softnet_process);
-       return;
+static void net_rx_action(struct softirq_action *h)
+{
+       int this_cpu = smp_processor_id();
+       struct softnet_data *queue = &softnet_data[this_cpu];
+       unsigned long start_time = jiffies;
+       int budget = netdev_max_backlog;
 
-softnet_break:
+       br_read_lock(BR_NETPROTO_LOCK);
+       local_irq_disable();
+
+       while (!list_empty(&queue->poll_list)) {
+               struct net_device *dev;
+
+               if (budget <= 0 || jiffies - start_time > 1)
+                       goto softnet_break;
+
+               local_irq_enable();
+
+               dev = list_entry(queue->poll_list.next, struct net_device, poll_list);
+
+               if (dev->quota <= 0 || dev->poll(dev, &budget)) {
+                       local_irq_disable();
+                       list_del(&dev->poll_list);
+                       list_add_tail(&dev->poll_list, &queue->poll_list);
+                       if (dev->quota < 0)
+                               dev->quota += dev->weight;
+                       else
+                               dev->quota = dev->weight;
+               } else {
+                       dev_put(dev);
+                       local_irq_disable();
+               }
+       }
+
+       local_irq_enable();
        br_read_unlock(BR_NETPROTO_LOCK);
+       return;
 
-       local_irq_disable();
+softnet_break:
        netdev_rx_stat[this_cpu].time_squeeze++;
-       /* This already runs in BH context, no need to wake up BH's */
-       cpu_raise_softirq(this_cpu, NET_RX_SOFTIRQ);
-       local_irq_enable();
+       __cpu_raise_softirq(this_cpu, NET_RX_SOFTIRQ);
 
-       NET_PROFILE_LEAVE(softnet_process);
-       return;
+       local_irq_enable();
+       br_read_unlock(BR_NETPROTO_LOCK);
 }
 
 static gifconf_func_t * gifconf_list [NPROTO];
@@ -2626,6 +2682,7 @@ int __init net_dev_init(void)
        if (!dev_boot_phase)
                return 0;
 
+
 #ifdef CONFIG_NET_DIVERT
        dv_init();
 #endif /* CONFIG_NET_DIVERT */
@@ -2643,8 +2700,13 @@ int __init net_dev_init(void)
                queue->cng_level = 0;
                queue->avg_blog = 10; /* arbitrary non-zero */
                queue->completion_queue = NULL;
+               INIT_LIST_HEAD(&queue->poll_list);
+               set_bit(__LINK_STATE_START, &queue->backlog_dev.state);
+               queue->backlog_dev.weight = weight_p;
+               queue->backlog_dev.poll = process_backlog;
+               atomic_set(&queue->backlog_dev.refcnt, 1);
        }
-       
+
 #ifdef CONFIG_NET_PROFILE
        net_profile_init();
        NET_PROFILE_REGISTER(dev_queue_xmit);
@@ -2744,7 +2806,6 @@ int __init net_dev_init(void)
 #ifdef CONFIG_NET_SCHED
        pktsched_init();
 #endif
-
        /*
         *      Initialise network devices
         */
index 2f6090a2fc9af301f52f4cfcd510ec957459607e..2e24556de9742e4bc80b32740b4afc0f00bb3d20 100644 (file)
@@ -12,6 +12,7 @@
 #ifdef CONFIG_SYSCTL
 
 extern int netdev_max_backlog;
+extern int weight_p;
 extern int no_cong_thresh;
 extern int no_cong;
 extern int lo_cong;
@@ -47,6 +48,9 @@ ctl_table core_table[] = {
        {NET_CORE_RMEM_DEFAULT, "rmem_default",
         &sysctl_rmem_default, sizeof(int), 0644, NULL,
         &proc_dointvec},
+       {NET_CORE_DEV_WEIGHT, "dev_weight",
+        &weight_p, sizeof(int), 0644, NULL,
+        &proc_dointvec},
        {NET_CORE_MAX_BACKLOG, "netdev_max_backlog",
         &netdev_max_backlog, sizeof(int), 0644, NULL,
         &proc_dointvec},
index c36a2994b3b2ea372d4c1c624f8530033bb86b5d..8c7eb0b0f7621300bf216346a5b76e58573648d4 100644 (file)
@@ -490,6 +490,7 @@ EXPORT_SYMBOL(__kfree_skb);
 EXPORT_SYMBOL(skb_clone);
 EXPORT_SYMBOL(skb_copy);
 EXPORT_SYMBOL(netif_rx);
+EXPORT_SYMBOL(netif_receive_skb);
 EXPORT_SYMBOL(dev_add_pack);
 EXPORT_SYMBOL(dev_remove_pack);
 EXPORT_SYMBOL(dev_get);