Skip to content

Commit fc168d1

Browse files
JasonXingKernel Patches Daemon
authored andcommitted
xsk: introduce a dedicated local completion queue for each xsk
Before the commit 30f241f ("xsk: Fix immature cq descriptor production"), there is one issue[1] which causes the wrong publish of descriptors in race condidtion. The above commit fixes the issue but adds more memory operations in the xmit hot path and interrupt context, which can cause side effect in performance. Based on the existing infrastructure, this patch tries to propose a new solution to fix the problem by using a pre-allocated memory that is local completion queue to avoid frequently performing memory functions. The benefit comes from replacing xsk_tx_generic_cache with local cq. The core logics are as show below: 1. allocate a new local completion queue when setting the real queue. 2. write the descriptors into the local cq in the xmit path. And record the prod as @start_pos that reflects the start position of skb in this queue so that later the skb can easily write the desc addr(s) from local cq to cq addrs in the destruction phase. 3. initialize the upper 24 bits of destructor_arg to store @start_pos in xsk_skb_init_misc(). 4. Initialize the lower 8 bits of destructor_arg to store how many descriptors the skb owns in xsk_inc_num_desc(). 5. write the desc addr(s) from the @start_addr from the local cq one by one into the real cq in xsk_destruct_skb(). In turn sync the global state of the cq as before. The format of destructor_arg is designed as: ------------------------ -------- | start_pos | num | ------------------------ -------- Using upper 24 bits is enough to keep the temporary descriptors. And it's also enough to use lower 8 bits to show the number of descriptors that one skb owns. [1]: https://lore.kernel.org/all/20250530095957.43248-1-e.kubanski@partner.samsung.com/ Signed-off-by: Jason Xing <kernelxing@tencent.com>
1 parent 73cd98f commit fc168d1

File tree

1 file changed

+53
-105
lines changed

1 file changed

+53
-105
lines changed

net/xdp/xsk.c

Lines changed: 53 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@ struct xsk_addrs {
4141
u64 addrs[MAX_SKB_FRAGS + 1];
4242
};
4343

44-
static struct kmem_cache *xsk_tx_generic_cache;
45-
4644
void xsk_set_rx_need_wakeup(struct xsk_buff_pool *pool)
4745
{
4846
if (pool->cached_need_wakeup & XDP_WAKEUP_RX)
@@ -539,81 +537,87 @@ static int xsk_wakeup(struct xdp_sock *xs, u8 flags)
539537
return dev->netdev_ops->ndo_xsk_wakeup(dev, xs->queue_id, flags);
540538
}
541539

542-
static int xsk_cq_reserve_locked(struct xsk_buff_pool *pool)
540+
static int xsk_cq_reserve_addr_locked(struct xdp_sock *xs, u64 addr)
543541
{
542+
struct xsk_buff_pool *pool = xs->pool;
543+
struct local_cq *lcq = xs->lcq;
544544
int ret;
545545

546546
spin_lock(&pool->cq_cached_prod_lock);
547547
ret = xskq_prod_reserve(pool->cq);
548548
spin_unlock(&pool->cq_cached_prod_lock);
549+
if (!ret)
550+
lcq->desc[lcq->prod++ & lcq->ring_mask] = addr;
549551

550552
return ret;
551553
}
552554

553-
static bool xsk_skb_destructor_is_addr(struct sk_buff *skb)
555+
#define XSK_DESTRUCTOR_DESCS_SHIFT 8
556+
#define XSK_DESTRUCTOR_DESCS_MASK \
557+
((1ULL << XSK_DESTRUCTOR_DESCS_SHIFT) - 1)
558+
559+
static long xsk_get_destructor_arg(struct sk_buff *skb)
554560
{
555-
return (uintptr_t)skb_shinfo(skb)->destructor_arg & 0x1UL;
561+
return (long)skb_shinfo(skb)->destructor_arg;
556562
}
557563

558-
static u64 xsk_skb_destructor_get_addr(struct sk_buff *skb)
564+
static u8 xsk_get_num_desc(struct sk_buff *skb)
559565
{
560-
return (u64)((uintptr_t)skb_shinfo(skb)->destructor_arg & ~0x1UL);
566+
long val = xsk_get_destructor_arg(skb);
567+
568+
return (u8)val & XSK_DESTRUCTOR_DESCS_MASK;
561569
}
562570

563-
static void xsk_skb_destructor_set_addr(struct sk_buff *skb, u64 addr)
571+
/* Record the position of first desc in local cq */
572+
static void xsk_skb_destructor_set_addr(struct sk_buff *skb,
573+
struct xdp_sock *xs)
564574
{
565-
skb_shinfo(skb)->destructor_arg = (void *)((uintptr_t)addr | 0x1UL);
575+
long val;
576+
577+
val = ((xs->lcq->prod - 1) & xs->lcq->ring_mask) << XSK_DESTRUCTOR_DESCS_SHIFT;
578+
skb_shinfo(skb)->destructor_arg = (void *)val;
566579
}
567580

581+
/* Only update the lower bits to adjust number of descriptors the skb
582+
* carries. We have enough bits to increase the value of number of
583+
* descriptors that should be within MAX_SKB_FRAGS, so increase it by
584+
* one directly.
585+
*/
568586
static void xsk_inc_num_desc(struct sk_buff *skb)
569587
{
570-
struct xsk_addrs *xsk_addr;
588+
long val = xsk_get_destructor_arg(skb) + 1;
571589

572-
if (!xsk_skb_destructor_is_addr(skb)) {
573-
xsk_addr = (struct xsk_addrs *)skb_shinfo(skb)->destructor_arg;
574-
xsk_addr->num_descs++;
575-
}
590+
skb_shinfo(skb)->destructor_arg = (void *)val;
576591
}
577592

578-
static u32 xsk_get_num_desc(struct sk_buff *skb)
593+
static u32 xsk_get_start_addr(struct sk_buff *skb)
579594
{
580-
struct xsk_addrs *xsk_addr;
595+
long val = xsk_get_destructor_arg(skb);
581596

582-
if (xsk_skb_destructor_is_addr(skb))
583-
return 1;
597+
return val >> XSK_DESTRUCTOR_DESCS_SHIFT;
598+
}
584599

585-
xsk_addr = (struct xsk_addrs *)skb_shinfo(skb)->destructor_arg;
600+
static void xsk_cq_write_addr(struct sk_buff *skb, u32 desc_processed)
601+
{
602+
struct xsk_buff_pool *pool = xdp_sk(skb->sk)->pool;
603+
u32 idx, addr, pos = xsk_get_start_addr(skb);
604+
struct xdp_sock *xs = xdp_sk(skb->sk);
586605

587-
return xsk_addr->num_descs;
606+
idx = xskq_get_prod(pool->cq) + desc_processed;
607+
addr = xs->lcq->desc[(pos + desc_processed) & xs->lcq->ring_mask];
608+
xskq_prod_write_addr(pool->cq, idx, addr);
588609
}
589610

590-
static void xsk_cq_submit_addr_locked(struct xsk_buff_pool *pool,
591-
struct sk_buff *skb)
611+
static void xsk_cq_submit_addr_locked(struct sk_buff *skb)
592612
{
593-
u32 num_descs = xsk_get_num_desc(skb);
594-
struct xsk_addrs *xsk_addr;
595-
u32 descs_processed = 0;
613+
struct xsk_buff_pool *pool = xdp_sk(skb->sk)->pool;
614+
u8 i, num = xsk_get_num_desc(skb);
596615
unsigned long flags;
597-
u32 idx, i;
598616

599617
spin_lock_irqsave(&pool->cq_prod_lock, flags);
600-
idx = xskq_get_prod(pool->cq);
601-
602-
if (unlikely(num_descs > 1)) {
603-
xsk_addr = (struct xsk_addrs *)skb_shinfo(skb)->destructor_arg;
604-
605-
for (i = 0; i < num_descs; i++) {
606-
xskq_prod_write_addr(pool->cq, idx + descs_processed,
607-
xsk_addr->addrs[i]);
608-
descs_processed++;
609-
}
610-
kmem_cache_free(xsk_tx_generic_cache, xsk_addr);
611-
} else {
612-
xskq_prod_write_addr(pool->cq, idx,
613-
xsk_skb_destructor_get_addr(skb));
614-
descs_processed++;
615-
}
616-
xskq_prod_submit_n(pool->cq, descs_processed);
618+
for (i = 0; i < num; i++)
619+
xsk_cq_write_addr(skb, i);
620+
xskq_prod_submit_n(pool->cq, num);
617621
spin_unlock_irqrestore(&pool->cq_prod_lock, flags);
618622
}
619623

@@ -634,30 +638,23 @@ void xsk_destruct_skb(struct sk_buff *skb)
634638
*compl->tx_timestamp = ktime_get_tai_fast_ns();
635639
}
636640

637-
xsk_cq_submit_addr_locked(xdp_sk(skb->sk)->pool, skb);
641+
xsk_cq_submit_addr_locked(skb);
638642
sock_wfree(skb);
639643
}
640644

641-
static void xsk_skb_init_misc(struct sk_buff *skb, struct xdp_sock *xs,
642-
u64 addr)
645+
static void xsk_skb_init_misc(struct sk_buff *skb, struct xdp_sock *xs)
643646
{
644647
skb->dev = xs->dev;
645648
skb->priority = READ_ONCE(xs->sk.sk_priority);
646649
skb->mark = READ_ONCE(xs->sk.sk_mark);
647650
skb->destructor = xsk_destruct_skb;
648-
xsk_skb_destructor_set_addr(skb, addr);
651+
xsk_skb_destructor_set_addr(skb, xs);
649652
}
650653

651654
static void xsk_consume_skb(struct sk_buff *skb)
652655
{
653656
struct xdp_sock *xs = xdp_sk(skb->sk);
654657
u32 num_descs = xsk_get_num_desc(skb);
655-
struct xsk_addrs *xsk_addr;
656-
657-
if (unlikely(num_descs > 1)) {
658-
xsk_addr = (struct xsk_addrs *)skb_shinfo(skb)->destructor_arg;
659-
kmem_cache_free(xsk_tx_generic_cache, xsk_addr);
660-
}
661658

662659
skb->destructor = sock_wfree;
663660
xsk_cq_cancel_locked(xs->pool, num_descs);
@@ -734,33 +731,12 @@ static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs,
734731

735732
skb_reserve(skb, hr);
736733

737-
xsk_skb_init_misc(skb, xs, desc->addr);
734+
xsk_skb_init_misc(skb, xs);
738735
if (desc->options & XDP_TX_METADATA) {
739736
err = xsk_skb_metadata(skb, buffer, desc, pool, hr);
740737
if (unlikely(err))
741738
return ERR_PTR(err);
742739
}
743-
} else {
744-
struct xsk_addrs *xsk_addr;
745-
746-
if (xsk_skb_destructor_is_addr(skb)) {
747-
xsk_addr = kmem_cache_zalloc(xsk_tx_generic_cache,
748-
GFP_KERNEL);
749-
if (!xsk_addr)
750-
return ERR_PTR(-ENOMEM);
751-
752-
xsk_addr->num_descs = 1;
753-
xsk_addr->addrs[0] = xsk_skb_destructor_get_addr(skb);
754-
skb_shinfo(skb)->destructor_arg = (void *)xsk_addr;
755-
} else {
756-
xsk_addr = (struct xsk_addrs *)skb_shinfo(skb)->destructor_arg;
757-
}
758-
759-
/* in case of -EOVERFLOW that could happen below,
760-
* xsk_consume_skb() will release this node as whole skb
761-
* would be dropped, which implies freeing all list elements
762-
*/
763-
xsk_addr->addrs[xsk_addr->num_descs] = desc->addr;
764740
}
765741

766742
len = desc->len;
@@ -828,7 +804,7 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
828804
if (unlikely(err))
829805
goto free_err;
830806

831-
xsk_skb_init_misc(skb, xs, desc->addr);
807+
xsk_skb_init_misc(skb, xs);
832808
if (desc->options & XDP_TX_METADATA) {
833809
err = xsk_skb_metadata(skb, buffer, desc,
834810
xs->pool, hr);
@@ -837,25 +813,9 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
837813
}
838814
} else {
839815
int nr_frags = skb_shinfo(skb)->nr_frags;
840-
struct xsk_addrs *xsk_addr;
841816
struct page *page;
842817
u8 *vaddr;
843818

844-
if (xsk_skb_destructor_is_addr(skb)) {
845-
xsk_addr = kmem_cache_zalloc(xsk_tx_generic_cache,
846-
GFP_KERNEL);
847-
if (!xsk_addr) {
848-
err = -ENOMEM;
849-
goto free_err;
850-
}
851-
852-
xsk_addr->num_descs = 1;
853-
xsk_addr->addrs[0] = xsk_skb_destructor_get_addr(skb);
854-
skb_shinfo(skb)->destructor_arg = (void *)xsk_addr;
855-
} else {
856-
xsk_addr = (struct xsk_addrs *)skb_shinfo(skb)->destructor_arg;
857-
}
858-
859819
if (unlikely(nr_frags == (MAX_SKB_FRAGS - 1) && xp_mb_desc(desc))) {
860820
err = -EOVERFLOW;
861821
goto free_err;
@@ -873,8 +833,6 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
873833

874834
skb_add_rx_frag(skb, nr_frags, page, 0, len, PAGE_SIZE);
875835
refcount_add(PAGE_SIZE, &xs->sk.sk_wmem_alloc);
876-
877-
xsk_addr->addrs[xsk_addr->num_descs] = desc->addr;
878836
}
879837
}
880838

@@ -931,7 +889,7 @@ static int __xsk_generic_xmit(struct sock *sk)
931889
* if there is space in it. This avoids having to implement
932890
* any buffering in the Tx path.
933891
*/
934-
err = xsk_cq_reserve_locked(xs->pool);
892+
err = xsk_cq_reserve_addr_locked(xs, desc.addr);
935893
if (err) {
936894
err = -EAGAIN;
937895
goto out;
@@ -1984,18 +1942,8 @@ static int __init xsk_init(void)
19841942
if (err)
19851943
goto out_pernet;
19861944

1987-
xsk_tx_generic_cache = kmem_cache_create("xsk_generic_xmit_cache",
1988-
sizeof(struct xsk_addrs),
1989-
0, SLAB_HWCACHE_ALIGN, NULL);
1990-
if (!xsk_tx_generic_cache) {
1991-
err = -ENOMEM;
1992-
goto out_unreg_notif;
1993-
}
1994-
19951945
return 0;
19961946

1997-
out_unreg_notif:
1998-
unregister_netdevice_notifier(&xsk_netdev_notifier);
19991947
out_pernet:
20001948
unregister_pernet_subsys(&xsk_net_ops);
20011949
out_sk:

0 commit comments

Comments
 (0)