API Reference

This section provides detailed API documentation for all PythonBPF modules, classes, and functions.

Module Overview

PythonBPF is organized into several modules:

  • pythonbpf - Main module with decorators and compilation functions

  • pythonbpf.maps - BPF map types

  • pythonbpf.helper - BPF helper functions

  • pythonbpf.structs - Struct type handling

  • pythonbpf.codegen - Code generation and compilation

Public API

The main pythonbpf module exports the following public API:

from pythonbpf import (
    # Decorators
    bpf,
    map,
    section,
    bpfglobal,
    struct,

    # Compilation
    compile_to_ir,
    compile,
    BPF,

    # Utilities
    trace_pipe,
    trace_fields,
)

Decorators

pythonbpf.decorators.bpf(func)[source]

Decorator to mark a function for BPF compilation.

pythonbpf.decorators.bpfglobal(func)[source]

Decorator to mark a function as a BPF global variable.

pythonbpf.decorators.map(func)[source]

Decorator to mark a function as a BPF map.

pythonbpf.decorators.struct(cls)[source]

Decorator to mark a class as a BPF struct.

pythonbpf.decorators.section(name)[source]
Parameters:

name (str)

bpf

@bpf
def my_function():
    pass

Decorator to mark a function or class for BPF compilation. Any function or class decorated with @bpf will be processed by the PythonBPF compiler.

See also: Decorators

map

@bpf
@map
def my_map() -> HashMap:
    return HashMap(key=c_uint32, value=c_uint64, max_entries=1024)

Decorator to mark a function as a BPF map definition. The function must return a map type.

See also: BPF Maps

section

@bpf
@section("tracepoint/syscalls/sys_enter_open")
def trace_open(ctx: c_void_p) -> c_int64:
    return c_int64(0)

Decorator to specify which kernel hook to attach the BPF program to.

Parameters:

  • name (str) - The section name (e.g., “tracepoint/…”, “kprobe/…”, “xdp”)

See also: Decorators

bpfglobal

@bpf
@bpfglobal
def LICENSE() -> str:
    return "GPL"

Decorator to mark a function as a BPF global variable definition.

See also: Decorators

struct

@bpf
@struct
class Event:
    timestamp: c_uint64
    pid: c_uint32

Decorator to mark a class as a BPF struct definition.

See also: BPF Structs

Compilation Functions

pythonbpf.codegen.finalize_module(original_str)[source]

After all IR generation is complete, we monkey patch btf_ama attribute

pythonbpf.codegen.bpf_passthrough_gen(module)[source]
pythonbpf.codegen.find_bpf_chunks(tree)[source]

Find all functions decorated with @bpf in the AST.

pythonbpf.codegen.processor(source_code, filename, compilation_context)[source]
pythonbpf.codegen.compile_to_ir(filename, output, loglevel=20)[source]
Parameters:
pythonbpf.codegen.compile(loglevel=30)[source]
Return type:

bool

pythonbpf.codegen.BPF(loglevel=30)[source]
Return type:

BpfObject

compile_to_ir()

def compile_to_ir(
    filename: str,
    output: str,
    loglevel=logging.WARNING
) -> None

Compile Python source to LLVM Intermediate Representation.

Parameters:

  • filename (str) - Path to the Python source file

  • output (str) - Path for the output LLVM IR file (.ll)

  • loglevel - Logging level (default: logging.WARNING)

See also: Compilation

compile()

def compile(
    filename: str = None,
    output: str = None,
    loglevel=logging.WARNING
) -> None

Compile Python source to BPF object file.

Parameters:

  • filename (str, optional) - Path to the Python source file (default: calling file)

  • output (str, optional) - Path for the output object file (default: same name with .o extension)

  • loglevel - Logging level (default: logging.WARNING)

See also: Compilation

BPF

class BPF:
    def __init__(
        self,
        filename: str = None,
        loglevel=logging.WARNING
    )

    def load(self) -> BpfObject
    def attach_all(self) -> None
    def load_and_attach(self) -> BpfObject

High-level interface to compile, load, and attach BPF programs.

Parameters:

  • filename (str, optional) - Path to Python source file (default: calling file)

  • loglevel - Logging level (default: logging.WARNING)

Methods:

  • load() - Load the compiled BPF program into the kernel

  • attach_all() - Attach all BPF programs to their hooks

  • load_and_attach() - Convenience method that loads and attaches

See also: Compilation

Utilities

pythonbpf.utils.trace_pipe()[source]

Util to read from the trace pipe.

pythonbpf.utils.trace_fields()[source]

Parse one line from trace_pipe into fields.

trace_pipe()

def trace_pipe() -> None

Read and display output from the kernel trace pipe.

Blocks until interrupted with Ctrl+C. Displays BPF program output from print() statements.

See also: Helper Functions and Utilities

trace_fields()

def trace_fields() -> tuple

Parse one line from the trace pipe into structured fields.

Returns: Tuple of (task, pid, cpu, flags, timestamp, message)

  • task (str) - Task/process name

  • pid (int) - Process ID

  • cpu (int) - CPU number

  • flags (bytes) - Trace flags

  • timestamp (float) - Timestamp in seconds

  • message (str) - The trace message

See also: Helper Functions and Utilities

Map Types

class pythonbpf.maps.maps.HashMap(key, value, max_entries)[source]

Bases: object

__init__(key, value, max_entries)[source]
lookup(key)[source]
delete(key)[source]
update(key, value, flags=None)[source]
class pythonbpf.maps.maps.PerfEventArray(key_size, value_size)[source]

Bases: object

__init__(key_size, value_size)[source]
output(data)[source]
class pythonbpf.maps.maps.RingBuffer(max_entries)[source]

Bases: object

__init__(max_entries)[source]
output(data, flags=0)[source]
reserve(size)[source]
Parameters:

size (int)

submit(data, flags=0)[source]
discard(data, flags=0)[source]

HashMap

class HashMap:
    def __init__(
        self,
        key,
        value,
        max_entries: int
    )

    def lookup(self, key)
    def update(self, key, value, flags=None)
    def delete(self, key)

Hash map for efficient key-value storage.

Parameters:

  • key - The type of the key (ctypes type or struct)

  • value - The type of the value (ctypes type or struct)

  • max_entries (int) - Maximum number of entries

Methods:

  • lookup(key) - Look up a value by key

  • update(key, value, flags=None) - Update or insert a key-value pair

  • delete(key) - Remove an entry from the map

See also: BPF Maps

PerfEventArray

class PerfEventArray:
    def __init__(
        self,
        key_size,
        value_size
    )

    def output(self, data)

Perf event array for sending data to userspace.

Parameters:

  • key_size - Type for the key

  • value_size - Type for the value

Methods:

  • output(data) - Send data to userspace

See also: BPF Maps

RingBuffer

class RingBuffer:
    def __init__(self, max_entries: int)

    def output(self, data, flags=0)
    def reserve(self, size: int)
    def submit(self, data, flags=0)
    def discard(self, data, flags=0)

Ring buffer for efficient event delivery.

Parameters:

  • max_entries (int) - Maximum size in bytes (must be power of 2)

Methods:

  • output(data, flags=0) - Send data to the ring buffer

  • reserve(size) - Reserve space in the buffer

  • submit(data, flags=0) - Submit previously reserved space

  • discard(data, flags=0) - Discard previously reserved space

See also: BPF Maps

Helper Functions

pythonbpf.helper.helpers.ktime()[source]

get current ktime

pythonbpf.helper.helpers.pid()[source]

get current process id

pythonbpf.helper.helpers.deref(ptr)[source]

dereference a pointer

pythonbpf.helper.helpers.comm(buf)[source]

get current process command name

pythonbpf.helper.helpers.probe_read_str(dst, src)[source]

Safely read a null-terminated string from kernel memory

pythonbpf.helper.helpers.random()[source]

get a pseudorandom u32 number

pythonbpf.helper.helpers.probe_read(dst, size, src)[source]

Safely read data from kernel memory

pythonbpf.helper.helpers.smp_processor_id()[source]

get the current CPU id

pythonbpf.helper.helpers.uid()[source]

get current user id

pythonbpf.helper.helpers.skb_store_bytes(offset, from_buf, size, flags=0)[source]

store bytes into a socket buffer

pythonbpf.helper.helpers.get_stack(buf, flags=0)[source]

get the current stack trace

pythonbpf.helper.helpers.get_current_cgroup_id()[source]

Get the current cgroup ID

Process Information

  • pid() - Get current process ID

  • comm(buf) - Get current process command name (requires buffer parameter)

  • uid() - Get current user ID

Time

  • ktime() - Get current kernel time in nanoseconds

CPU

  • smp_processor_id() - Get current CPU ID

Memory

  • probe_read(dst, size, src) - Safely read kernel memory

  • probe_read_str(dst, src) - Safely read string from kernel memory

  • deref(ptr) - Dereference a pointer

Random

  • random() - Get pseudo-random number

See also: Helper Functions and Utilities

Type System

PythonBPF uses Python’s ctypes module for type definitions:

Integer Types

  • c_int8, c_int16, c_int32, c_int64 - Signed integers

  • c_uint8, c_uint16, c_uint32, c_uint64 - Unsigned integers

Other Types

  • c_char, c_bool - Characters and booleans

  • c_void_p - Void pointers

  • str(N) - Fixed-length strings

Examples

Basic Usage

from pythonbpf import bpf, section, bpfglobal, BPF, trace_pipe
from ctypes import c_void_p, c_int64

@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def hello(ctx: c_void_p) -> c_int64:
    print("Hello, World!")
    return 0

@bpf
@bpfglobal
def LICENSE() -> str:
    return "GPL"

b = BPF()
b.load_and_attach()
trace_pipe()

With Maps

from pythonbpf import bpf, map, section, bpfglobal, BPF
from pythonbpf.maps import HashMap
from pythonbpf.helper import pid
from ctypes import c_void_p, c_int64, c_uint32, c_uint64

@bpf
@map
def counters() -> HashMap:
    return HashMap(key=c_uint32, value=c_uint64, max_entries=256)

@bpf
@section("tracepoint/syscalls/sys_enter_clone")
def count_clones(ctx: c_void_p) -> c_int64:
    process_id = pid()
    count = counters.lookup(process_id)

    if count:
        counters.update(process_id, count + 1)
    else:
        counters.update(process_id, 1)

    return 0

@bpf
@bpfglobal
def LICENSE() -> str:
    return "GPL"

b = BPF()
b.load_and_attach()

With Structs

from pythonbpf import bpf, struct, map, section, bpfglobal, BPF
from pythonbpf.maps import RingBuffer
from pythonbpf.helper import pid, ktime, comm
from ctypes import c_void_p, c_int64, c_uint32, c_uint64

@bpf
@struct
class Event:
    timestamp: c_uint64
    pid: c_uint32
    comm: str(16)

@bpf
@map
def events() -> RingBuffer:
    return RingBuffer(max_entries=4096)

@bpf
@section("tracepoint/syscalls/sys_enter_execve")
def track_exec(ctx: c_void_p) -> c_int64:
    event = Event()
    event.timestamp = ktime()
    event.pid = pid()
    comm(event.comm)

    events.output(event)
    return 0

@bpf
@bpfglobal
def LICENSE() -> str:
    return "GPL"

b = BPF()
b.load_and_attach()

See Also