#define _GNU_SOURCE
#include <stdio.h>
#include <unistd.h>
#include <signal.h>
#include <time.h>
#include <bpf/bpf.h>
#include <scx/common.h>
#include <sys/wait.h>
#include <sched.h>
#include <pthread.h>
#include "scx_test.h"
#include "dequeue.bpf.skel.h"
#define NUM_WORKERS 8
#define AFFINITY_HAMMER_MS 500
static void worker_fn(int id)
{
int i;
volatile int sum = 0;
for (i = 0; i < 1000; i++) {
volatile int j;
for (j = 0; j < 10000; j++)
sum += j;
usleep(1000 + (id * 100));
}
exit(0);
}
static void *affinity_hammer_fn(void *arg)
{
pid_t *pids = arg;
cpu_set_t cpuset;
int i = 0, n = NUM_WORKERS;
struct timespec start, now;
clock_gettime(CLOCK_MONOTONIC, &start);
while (1) {
int w = i % n;
int cpu = (i / n) % 4;
CPU_ZERO(&cpuset);
CPU_SET(cpu, &cpuset);
sched_setaffinity(pids[w], sizeof(cpuset), &cpuset);
i++;
if ((i & 255) == 0) {
long long elapsed_ms;
clock_gettime(CLOCK_MONOTONIC, &now);
elapsed_ms = (now.tv_sec - start.tv_sec) * 1000LL +
(now.tv_nsec - start.tv_nsec) / 1000000;
if (elapsed_ms >= AFFINITY_HAMMER_MS)
break;
}
}
return NULL;
}
static enum scx_test_status run_scenario(struct dequeue *skel, u32 scenario,
const char *scenario_name)
{
struct bpf_link *link;
pid_t pids[NUM_WORKERS];
pthread_t hammer;
int i, status;
u64 enq_start, deq_start,
dispatch_deq_start, change_deq_start, bpf_queue_full_start;
u64 enq_delta, deq_delta,
dispatch_deq_delta, change_deq_delta, bpf_queue_full_delta;
skel->bss->test_scenario = scenario;
enq_start = skel->bss->enqueue_cnt;
deq_start = skel->bss->dequeue_cnt;
dispatch_deq_start = skel->bss->dispatch_dequeue_cnt;
change_deq_start = skel->bss->change_dequeue_cnt;
bpf_queue_full_start = skel->bss->bpf_queue_full;
link = bpf_map__attach_struct_ops(skel->maps.dequeue_ops);
SCX_FAIL_IF(!link, "Failed to attach struct_ops for scenario %s", scenario_name);
for (i = 0; i < NUM_WORKERS; i++) {
pids[i] = fork();
SCX_FAIL_IF(pids[i] < 0, "Failed to fork worker %d", i);
if (pids[i] == 0) {
worker_fn(i);
exit(1);
}
}
SCX_FAIL_IF(pthread_create(&hammer, NULL, affinity_hammer_fn, pids) != 0,
"Failed to create affinity hammer thread");
pthread_join(hammer, NULL);
for (i = 0; i < NUM_WORKERS; i++) {
SCX_FAIL_IF(waitpid(pids[i], &status, 0) != pids[i],
"Failed to wait for worker %d", i);
SCX_FAIL_IF(status != 0, "Worker %d exited with status %d", i, status);
}
bpf_link__destroy(link);
SCX_EQ(skel->data->uei.kind, EXIT_KIND(SCX_EXIT_UNREG));
enq_delta = skel->bss->enqueue_cnt - enq_start;
deq_delta = skel->bss->dequeue_cnt - deq_start;
dispatch_deq_delta = skel->bss->dispatch_dequeue_cnt - dispatch_deq_start;
change_deq_delta = skel->bss->change_dequeue_cnt - change_deq_start;
bpf_queue_full_delta = skel->bss->bpf_queue_full - bpf_queue_full_start;
printf("%s:\n", scenario_name);
printf(" enqueues: %lu\n", (unsigned long)enq_delta);
printf(" dequeues: %lu (dispatch: %lu, property_change: %lu)\n",
(unsigned long)deq_delta,
(unsigned long)dispatch_deq_delta,
(unsigned long)change_deq_delta);
printf(" BPF queue full: %lu\n", (unsigned long)bpf_queue_full_delta);
if (scenario == 0 || scenario == 1 ||
scenario == 3 || scenario == 4) {
SCX_EQ(enq_delta, 0);
SCX_EQ(deq_delta, 0);
SCX_EQ(dispatch_deq_delta, 0);
SCX_EQ(change_deq_delta, 0);
} else {
SCX_GT(enq_delta, 0);
SCX_GT(deq_delta, 0);
SCX_EQ(enq_delta, deq_delta);
}
return SCX_TEST_PASS;
}
static enum scx_test_status setup(void **ctx)
{
struct dequeue *skel;
skel = dequeue__open();
SCX_FAIL_IF(!skel, "Failed to open skel");
SCX_ENUM_INIT(skel);
SCX_FAIL_IF(dequeue__load(skel), "Failed to load skel");
*ctx = skel;
return SCX_TEST_PASS;
}
static enum scx_test_status run(void *ctx)
{
struct dequeue *skel = ctx;
enum scx_test_status status;
status = run_scenario(skel, 0, "Scenario 0: Local DSQ from ops.select_cpu()");
if (status != SCX_TEST_PASS)
return status;
status = run_scenario(skel, 1, "Scenario 1: Global DSQ from ops.select_cpu()");
if (status != SCX_TEST_PASS)
return status;
status = run_scenario(skel, 2, "Scenario 2: User DSQ from ops.select_cpu()");
if (status != SCX_TEST_PASS)
return status;
status = run_scenario(skel, 3, "Scenario 3: Local DSQ from ops.enqueue()");
if (status != SCX_TEST_PASS)
return status;
status = run_scenario(skel, 4, "Scenario 4: Global DSQ from ops.enqueue()");
if (status != SCX_TEST_PASS)
return status;
status = run_scenario(skel, 5, "Scenario 5: User DSQ from ops.enqueue()");
if (status != SCX_TEST_PASS)
return status;
status = run_scenario(skel, 6, "Scenario 6: BPF queue from ops.enqueue()");
if (status != SCX_TEST_PASS)
return status;
printf("\n=== Summary ===\n");
printf("Total enqueues: %lu\n", (unsigned long)skel->bss->enqueue_cnt);
printf("Total dequeues: %lu\n", (unsigned long)skel->bss->dequeue_cnt);
printf(" Dispatch dequeues: %lu (no flag, normal workflow)\n",
(unsigned long)skel->bss->dispatch_dequeue_cnt);
printf(" Property change dequeues: %lu (SCX_DEQ_SCHED_CHANGE flag)\n",
(unsigned long)skel->bss->change_dequeue_cnt);
printf(" BPF queue full: %lu\n",
(unsigned long)skel->bss->bpf_queue_full);
printf("\nAll scenarios passed - no state machine violations detected\n");
printf("-> Validated: Local DSQ dispatch bypasses BPF scheduler\n");
printf("-> Validated: Global DSQ dispatch bypasses BPF scheduler\n");
printf("-> Validated: User DSQ dispatch triggers ops.dequeue() callbacks\n");
printf("-> Validated: Dispatch dequeues have no flags (normal workflow)\n");
printf("-> Validated: Property change dequeues have SCX_DEQ_SCHED_CHANGE flag\n");
printf("-> Validated: No duplicate enqueues or invalid state transitions\n");
return SCX_TEST_PASS;
}
static void cleanup(void *ctx)
{
struct dequeue *skel = ctx;
dequeue__destroy(skel);
}
struct scx_test dequeue_test = {
.name = "dequeue",
.description = "Verify ops.dequeue() semantics",
.setup = setup,
.run = run,
.cleanup = cleanup,
};
REGISTER_SCX_TEST(&dequeue_test)