Date: Tue, 20 Oct 2015 19:20:42 +0000 (UTC) From: "Conrad E. Meyer" <cem@FreeBSD.org> To: src-committers@freebsd.org, svn-src-all@freebsd.org, svn-src-head@freebsd.org Subject: svn commit: r289651 - head/sys/dev/ntb/if_ntb Message-ID: <201510201920.t9KJKgaV037894@repo.freebsd.org>
next in thread | raw e-mail | index | archive | help
Author: cem Date: Tue Oct 20 19:20:42 2015 New Revision: 289651 URL: https://svnweb.freebsd.org/changeset/base/289651 Log: NTB: MFV da2e5ae5: Fix ntb_transport out-of-order RX update It was possible for a synchronous update of the RX index in the error case to get ahead of the asynchronous RX index update in the normal case. Change the RX processing to preserve an RX completion order. There were two error cases. First, if a buffer is not present to receive data, there would be no queue entry to preserve the RX completion order. Instead of dropping the RX frame, leave the RX frame in the ring. Schedule RX processing when RX entries are enqueued, in case there are RX frames waiting in the ring to be received. Second, if a buffer is too small to receive data, drop the frame in the ring, mark the RX entry as done, and indicate the error in the RX entry length. Check for a negative length in the receive callback in ntb_netdev, and count occurrences as rx_length_errors. Authored by: Allen Hubbe Obtained from: Linux (Dual BSD/GPL driver) Sponsored by: EMC / Isilon Storage Division Modified: head/sys/dev/ntb/if_ntb/if_ntb.c Modified: head/sys/dev/ntb/if_ntb/if_ntb.c ============================================================================== --- head/sys/dev/ntb/if_ntb/if_ntb.c Tue Oct 20 19:20:33 2015 (r289650) +++ head/sys/dev/ntb/if_ntb/if_ntb.c Tue Oct 20 19:20:42 2015 (r289651) @@ -149,10 +149,11 @@ struct ntb_transport_qp { void (*rx_handler)(struct ntb_transport_qp *qp, void *qp_data, void *data, int len); + struct ntb_queue_list rx_post_q; struct ntb_queue_list rx_pend_q; struct ntb_queue_list rx_free_q; - struct mtx ntb_rx_pend_q_lock; - struct mtx ntb_rx_free_q_lock; + /* ntb_rx_q_lock: synchronize access to rx_XXXX_q */ + struct mtx ntb_rx_q_lock; struct task rx_completion_task; struct task rxc_db_work; void *rx_buff; @@ -285,10 +286,11 @@ static void ntb_memcpy_tx(struct ntb_tra struct ntb_queue_entry *entry, void *offset); static void ntb_qp_full(void *arg); static void ntb_transport_rxc_db(void *arg, int pending); -static void ntb_rx_pendq_full(void *arg); static int ntb_process_rxc(struct ntb_transport_qp *qp); -static void ntb_rx_copy_task(struct ntb_transport_qp *qp, +static void ntb_memcpy_rx(struct ntb_transport_qp *qp, struct ntb_queue_entry *entry, void *offset); +static inline void ntb_rx_copy_callback(struct ntb_transport_qp *qp, + void *data); static void ntb_complete_rxc(void *arg, int pending); static void ntb_transport_doorbell_callback(void *data, uint32_t vector); static void ntb_transport_event_callback(void *data); @@ -308,6 +310,8 @@ static void ntb_list_add(struct mtx *loc struct ntb_queue_list *list); static struct ntb_queue_entry *ntb_list_rm(struct mtx *lock, struct ntb_queue_list *list); +static struct ntb_queue_entry *ntb_list_mv(struct mtx *lock, + struct ntb_queue_list *from, struct ntb_queue_list *to); static void create_random_local_eui48(u_char *eaddr); static unsigned int ntb_transport_max_size(struct ntb_transport_qp *qp); @@ -681,12 +685,12 @@ ntb_transport_init_queue(struct ntb_tran callout_init(&qp->queue_full, 1); callout_init(&qp->rx_full, 1); - mtx_init(&qp->ntb_rx_pend_q_lock, "ntb rx pend q", NULL, MTX_SPIN); - mtx_init(&qp->ntb_rx_free_q_lock, "ntb rx free q", NULL, MTX_SPIN); + mtx_init(&qp->ntb_rx_q_lock, "ntb rx q", NULL, MTX_SPIN); mtx_init(&qp->ntb_tx_free_q_lock, "ntb tx free q", NULL, MTX_SPIN); TASK_INIT(&qp->rx_completion_task, 0, ntb_complete_rxc, qp); TASK_INIT(&qp->rxc_db_work, 0, ntb_transport_rxc_db, qp); + STAILQ_INIT(&qp->rx_post_q); STAILQ_INIT(&qp->rx_pend_q); STAILQ_INIT(&qp->rx_free_q); STAILQ_INIT(&qp->tx_free_q); @@ -711,10 +715,13 @@ ntb_transport_free_queue(struct ntb_tran qp->tx_handler = NULL; qp->event_handler = NULL; - while ((entry = ntb_list_rm(&qp->ntb_rx_free_q_lock, &qp->rx_free_q))) + while ((entry = ntb_list_rm(&qp->ntb_rx_q_lock, &qp->rx_free_q))) free(entry, M_NTB_IF); - while ((entry = ntb_list_rm(&qp->ntb_rx_pend_q_lock, &qp->rx_pend_q))) + while ((entry = ntb_list_rm(&qp->ntb_rx_q_lock, &qp->rx_pend_q))) + free(entry, M_NTB_IF); + + while ((entry = ntb_list_rm(&qp->ntb_rx_q_lock, &qp->rx_post_q))) free(entry, M_NTB_IF); while ((entry = ntb_list_rm(&qp->ntb_tx_free_q_lock, &qp->tx_free_q))) @@ -769,7 +776,7 @@ ntb_transport_create_queue(void *data, s entry->cb_data = nt->ifp; entry->buf = NULL; entry->len = transport_mtu; - ntb_list_add(&qp->ntb_rx_pend_q_lock, entry, &qp->rx_pend_q); + ntb_list_add(&qp->ntb_rx_q_lock, entry, &qp->rx_pend_q); } for (i = 0; i < NTB_QP_DEF_NUM_ENTRIES; i++) { @@ -951,14 +958,6 @@ ntb_qp_full(void *arg) /* Transport Rx */ static void -ntb_rx_pendq_full(void *arg) -{ - - CTR0(KTR_NTB, "RX: ntb_rx_pendq_full callout"); - ntb_transport_rxc_db(arg, 0); -} - -static void ntb_transport_rxc_db(void *arg, int pending __unused) { struct ntb_transport_qp *qp = arg; @@ -1030,7 +1029,7 @@ ntb_process_rxc(struct ntb_transport_qp return (EIO); } - entry = ntb_list_rm(&qp->ntb_rx_pend_q_lock, &qp->rx_pend_q); + entry = ntb_list_mv(&qp->ntb_rx_q_lock, &qp->rx_pend_q, &qp->rx_post_q); if (entry == NULL) { qp->rx_err_no_buf++; CTR0(KTR_NTB, "RX: No entries in rx_pend_q"); @@ -1050,7 +1049,6 @@ ntb_process_rxc(struct ntb_transport_qp entry->len = -EIO; entry->flags |= IF_NTB_DESC_DONE_FLAG; - ntb_list_add(&qp->ntb_rx_free_q_lock, entry, &qp->rx_free_q); taskqueue_enqueue(taskqueue_swi, &qp->rx_completion_task); } else { qp->rx_bytes += hdr->len; @@ -1060,7 +1058,7 @@ ntb_process_rxc(struct ntb_transport_qp entry->len = hdr->len; - ntb_rx_copy_task(qp, entry, offset); + ntb_memcpy_rx(qp, entry, offset); } qp->rx_index++; @@ -1069,7 +1067,7 @@ ntb_process_rxc(struct ntb_transport_qp } static void -ntb_rx_copy_task(struct ntb_transport_qp *qp, struct ntb_queue_entry *entry, +ntb_memcpy_rx(struct ntb_transport_qp *qp, struct ntb_queue_entry *entry, void *offset) { struct ifnet *ifp = entry->cb_data; @@ -1084,15 +1082,18 @@ ntb_rx_copy_task(struct ntb_transport_qp /* Ensure that the data is globally visible before clearing the flag */ wmb(); - entry->x_hdr->flags = 0; - /* TODO: replace with bus_space_write */ - qp->rx_info->entry = qp->rx_index; - CTR2(KTR_NTB, - "RX: copied entry %p to mbuf %p. Adding entry to rx_free_q", entry, - m); - ntb_list_add(&qp->ntb_rx_free_q_lock, entry, &qp->rx_free_q); + CTR2(KTR_NTB, "RX: copied entry %p to mbuf %p.", entry, m); + ntb_rx_copy_callback(qp, entry); +} +static inline void +ntb_rx_copy_callback(struct ntb_transport_qp *qp, void *data) +{ + struct ntb_queue_entry *entry; + + entry = data; + entry->flags |= IF_NTB_DESC_DONE_FLAG; taskqueue_enqueue(taskqueue_swi, &qp->rx_completion_task); } @@ -1100,30 +1101,39 @@ static void ntb_complete_rxc(void *arg, int pending) { struct ntb_transport_qp *qp = arg; - struct mbuf *m; struct ntb_queue_entry *entry; + struct mbuf *m; + unsigned len; CTR0(KTR_NTB, "RX: rx_completion_task"); - while ((entry = ntb_list_rm(&qp->ntb_rx_free_q_lock, &qp->rx_free_q))) { + mtx_lock_spin(&qp->ntb_rx_q_lock); + + while (!STAILQ_EMPTY(&qp->rx_post_q)) { + entry = STAILQ_FIRST(&qp->rx_post_q); + if ((entry->flags & IF_NTB_DESC_DONE_FLAG) == 0) + break; + + entry->x_hdr->flags = 0; + /* XXX bus_space_write */ + qp->rx_info->entry = entry->index; + + len = entry->len; m = entry->buf; - CTR2(KTR_NTB, "RX: completing entry %p, mbuf %p", entry, m); - if (qp->rx_handler && qp->client_ready) - qp->rx_handler(qp, qp->cb_data, m, entry->len); - entry->buf = NULL; - entry->len = qp->transport->bufsize; + STAILQ_REMOVE_HEAD(&qp->rx_post_q, entry); + STAILQ_INSERT_TAIL(&qp->rx_free_q, entry, entry); - CTR1(KTR_NTB,"RX: entry %p removed from rx_free_q " - "and added to rx_pend_q", entry); - ntb_list_add(&qp->ntb_rx_pend_q_lock, entry, &qp->rx_pend_q); - if (qp->rx_err_no_buf > qp->last_rx_no_buf) { - qp->last_rx_no_buf = qp->rx_err_no_buf; - CTR0(KTR_NTB, "RX: could spawn rx task"); - callout_reset(&qp->rx_full, hz / 1000, ntb_rx_pendq_full, - qp); - } + mtx_unlock_spin(&qp->ntb_rx_q_lock); + + CTR2(KTR_NTB, "RX: completing entry %p, mbuf %p", entry, m); + if (qp->rx_handler != NULL && qp->client_ready) + qp->rx_handler(qp, qp->cb_data, m, len); + + mtx_lock_spin(&qp->ntb_rx_q_lock); } + + mtx_unlock_spin(&qp->ntb_rx_q_lock); } static void @@ -1573,6 +1583,26 @@ out: return (entry); } +static struct ntb_queue_entry * +ntb_list_mv(struct mtx *lock, struct ntb_queue_list *from, + struct ntb_queue_list *to) +{ + struct ntb_queue_entry *entry; + + mtx_lock_spin(lock); + if (STAILQ_EMPTY(from)) { + entry = NULL; + goto out; + } + entry = STAILQ_FIRST(from); + STAILQ_REMOVE_HEAD(from, entry); + STAILQ_INSERT_TAIL(to, entry, entry); + +out: + mtx_unlock_spin(lock); + return (entry); +} + /* Helper functions */ /* TODO: This too should really be part of the kernel */ #define EUI48_MULTICAST 1 << 0
Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?201510201920.t9KJKgaV037894>