• Futex Stack...

    From Chris M. Thomasson@chris.m.thomasson.1@gmail.com to comp.arch on Tue Mar 25 00:11:35 2025
    From Newsgroup: comp.arch

    This is a little C++20 test using a futex to allow one to wait on a
    lock-free stack. The main stack logic is in struct ct_stack. Well, can
    you get to compile and run? Thanks... There is no need for DWCAS here.
    It uses just XCHG and CMPXCHG.


    My code:
    ______________________________________
    #include <iostream>
    #include <thread>
    #include <atomic>
    #include <algorithm>
    #include <cassert>


    #define CT_THREAD_N (42)
    #define CT_WORK_N (10000000)
    #define CT_WAIT ((ct_node*)0xDEADBEEF)


    struct ct_node
    {
    ct_node* m_next = { nullptr };

    ct_node()
    {
    //std::cout << "(" << this << ")::ct_node::ct_node()\n";
    }

    ~ct_node()
    {
    //std::cout << "(" << this << ")::ct_node::~ct_node()\n";
    }
    };


    struct ct_stack
    {
    std::atomic<ct_node*> m_head = { nullptr };

    void
    push(
    ct_node* node
    ) {
    ct_node* head = m_head.load(std::memory_order_relaxed);

    do
    {
    if (head == CT_WAIT)
    {
    node->m_next = nullptr;
    }

    else
    {
    node->m_next = head;
    }

    } while (! m_head.compare_exchange_weak(
    head,
    node,
    std::memory_order_release)
    );

    if (head == CT_WAIT)
    {
    // slow path...
    m_head.notify_one();
    }
    }

    ct_node*
    flush_wait()
    {
    ct_node* head = nullptr;

    for (;;)
    {
    head = m_head.exchange(nullptr, std::memory_order_acquire);

    while (! head || head == CT_WAIT)
    {
    // slow path...
    head = m_head.exchange(CT_WAIT, std::memory_order_acquire);

    if (! head || head == CT_WAIT)
    {
    m_head.wait(CT_WAIT, std::memory_order_relaxed);
    continue;
    }
    }

    break;
    }

    assert(head && head != CT_WAIT);

    return head;
    }
    };


    struct ct_work : public ct_node
    {
    unsigned long m_state = { 0 };
    };


    struct ct_shared
    {
    ct_stack m_stack_in;
    ct_stack m_stack_out;

    ct_shared()
    {
    std::cout << "ct_shared::ct_shared()\n" << std::endl;
    }

    ~ct_shared()
    {
    assert(! m_stack_in.m_head || m_stack_in.m_head == CT_WAIT);
    assert(! m_stack_out.m_head || m_stack_out.m_head == CT_WAIT);

    std::cout << "ct_shared::~ct_shared()\n" << std::endl;
    }
    };


    void
    ct_thread(
    ct_shared& shared
    ) {
    unsigned long state = 0;

    while (! state)
    {
    ct_work* head = (ct_work*)shared.m_stack_in.flush_wait();

    while (head)
    {
    ct_work* next = (ct_work*)head->m_next;

    if (head->m_state == 666)
    {
    // Shutdown detected...
    state = 1;
    shared.m_stack_in.push(head);
    }

    else
    {
    shared.m_stack_out.push(head);
    }

    head = next;
    }
    }

    //std::cout << "shutdown..." << std::endl;
    }


    int
    main()
    {
    {
    std::cout << "Futex Stack Test\n";
    std::cout << "by: Chris M. Thomasson\n";
    std::cout << "version: (0.0.1)\n";
    std::cout << "_________________________________\n" << std::endl;
    }

    unsigned long g_ct_work_alloations = 0;
    unsigned long g_ct_work_dealloations = 0;

    {
    std::cout << "CT_THREAD_N = " << CT_THREAD_N << std::endl;
    std::cout << "CT_WORK_N = " << CT_WORK_N << std::endl;
    std::cout << "CT_WAIT = " << CT_WAIT << std::endl;
    }

    {
    ct_shared shared = { };

    std::thread threads[CT_THREAD_N];

    std::cout << "\nLaunching threads...\n" << std::endl;

    for (unsigned long i = 0; i < CT_THREAD_N; ++i)
    {
    threads[i] = std::thread(ct_thread, std::ref(shared));
    }

    //std::this_thread::sleep_for(std::chrono::seconds(2));

    std::cout << "\nGenerate work...\n" << std::endl;

    // Generate work...
    {
    for (unsigned long i = 0; i < CT_WORK_N; ++i)
    {
    shared.m_stack_in.push(new ct_work);
    ++g_ct_work_alloations;
    }
    }

    // Wait for work...
    {
    unsigned long wn = 0;

    while (wn < CT_WORK_N)
    {
    ct_work* head = (ct_work*)shared.m_stack_out.flush_wait();

    while (head)
    {
    ct_work* next = (ct_work*)head->m_next;
    delete head;
    ++g_ct_work_dealloations;
    ++wn;
    head = next;
    }
    }
    }

    std::cout << "\nCompleted all work!\n" << std::endl;
    std::cout << "Sending shutdown state...\n" << std::endl;

    // Send shutdown state...
    {
    for (unsigned long i = 0; i < CT_THREAD_N; ++i)
    {
    ct_work* w = new ct_work;
    ++g_ct_work_alloations;
    w->m_state = 666;
    shared.m_stack_in.push(w);
    }
    }

    // Join threads...
    {
    for (unsigned long i = 0; i < CT_THREAD_N; ++i)
    {
    threads[i].join();
    }
    }

    std::cout << "\nThreads joined...\n" << std::endl;

    // Dump shutdown state...
    std::cout << "Dump shutdown state...\n" << std::endl;

    {
    ct_work* head = (ct_work*)shared.m_stack_in.m_head.load(std::memory_order_relaxed);
    shared.m_stack_in.m_head = nullptr;

    while (head)
    {
    ct_work* next = (ct_work*)head->m_next;
    delete head;
    ++g_ct_work_dealloations;
    head = next;
    }
    }

    std::cout << "\nShutdown complete!\n" << std::endl;
    }

    // Sanity check...
    {
    std::cout << "g_ct_work_alloations = " << g_ct_work_alloations
    << "\n";
    std::cout << "g_ct_work_dealloations = " <<
    g_ct_work_dealloations << std::endl;

    if (g_ct_work_alloations != g_ct_work_dealloations)
    {
    std::cout << "Pardon my French, but shit!!!!!!!!!!!!!! NOT GOOD\n" << std::endl;
    std::cout << "DAMN IT!!!! Grrrrr\n" << std::endl;
    }
    }

    std::cout << "\nFin!\n" << std::endl;

    return 0;
    }
    ______________________________________




    My output:
    _______________________
    Futex Stack Test
    by: Chris M. Thomasson
    version: (0.0.1)
    _________________________________

    CT_THREAD_N = 42
    CT_WORK_N = 10000000
    CT_WAIT = 00000000DEADBEEF
    ct_shared::ct_shared()


    Launching threads...


    Generate work...


    Completed all work!

    Sending shutdown state...


    Threads joined...

    Dump shutdown state...


    Shutdown complete!

    ct_shared::~ct_shared()

    g_ct_work_alloations = 10000042
    g_ct_work_dealloations = 10000042

    Fin!
    _______________________


    Well, any luck? Try to forgive me for misspelling
    alloations/dealloations, damn it. ;^o
    --- Synchronet 3.20c-Linux NewsLink 1.2
  • From Chris M. Thomasson@chris.m.thomasson.1@gmail.com to comp.arch on Fri Mar 28 09:55:24 2025
    From Newsgroup: comp.arch

    On 3/25/2025 12:11 AM, Chris M. Thomasson wrote:
    This is a little C++20 test using a futex to allow one to wait on a lock-free stack. The main stack logic is in struct ct_stack. Well, can
    you get to compile and run? Thanks... There is no need for DWCAS here.
    It uses just XCHG and CMPXCHG.


    My code:
    ______________________________________
    #include <iostream>
    #include <thread>
    #include <atomic>
    #include <algorithm>
    #include <cassert>


    #define CT_THREAD_N (42)
    #define CT_WORK_N (10000000)


    #define CT_WAIT ((ct_node*)0xDEADBEEF)
    ^^^^^^^^^^^^^^^^^^^^^^^
    [...]


    Fwiw, I am using CT_WAIT for the C++20 futex to avoid slow-paths.
    However, this scares me a bit because if the user allocates a node that
    has that exact address, it will totally break the algorithm. So, I
    really should create a "stub dummy node" and use its address for the
    wait condition. Something like this pseudo-code:

    static ct_node const g_dummy_wait = { };

    Then change CT_WAIT to (&g_dummy_wait). That would eliminate any
    possibility of that potential error to occur.
    --- Synchronet 3.20c-Linux NewsLink 1.2
  • From Chris M. Thomasson@chris.m.thomasson.1@gmail.com to comp.arch on Tue Apr 8 23:07:11 2025
    From Newsgroup: comp.arch

    On 3/25/2025 12:11 AM, Chris M. Thomasson wrote:
    This is a little C++20 test using a futex to allow one to wait on a lock-free stack. The main stack logic is in struct ct_stack. Well, can
    you get to compile and run? Thanks... There is no need for DWCAS here.
    It uses just XCHG and CMPXCHG.


    My code:
    ______________________________________

    [...]

    To tidy things up a little, the flush_wait can look like this: ____________________
    ct_node*
    flush_wait()
    {
    ct_node* head = m_head.exchange(nullptr, std::memory_order_acquire);

    while (! head || head == CT_WAIT)
    {
    // slow path...
    head = m_head.exchange(CT_WAIT, std::memory_order_acquire);

    if (! head || head == CT_WAIT)
    {
    m_head.wait(CT_WAIT, std::memory_order_relaxed);
    }
    }

    assert(head && head != CT_WAIT);

    return head;
    }

    ____________________

    Also, I made a spelling error wrt:

    unsigned long g_ct_work_alloations = 0;
    unsigned long g_ct_work_dealloations = 0;

    grrrrr!
    --- Synchronet 3.20c-Linux NewsLink 1.2