Skip to content

API Reference

Overview

PyBurgers: 1D Stochastic Burgers Equation Solver.

A tool for studying Burgers turbulence using DNS and LES.

Core Solvers

Burgers

Burgers(input_obj: Input, output_obj: Output)

Bases: ABC

Abstract base class for Burgers equation solvers.

Provides common functionality for solving the 1D stochastic Burgers equation using Fourier collocation for spatial derivatives and Williamson (1980) low-storage RK3 time integration with CFL-based adaptive time stepping.

Subclasses must implement
  • _get_nx(): Return the grid resolution for this mode
  • _create_spectral_workspace(): Create the spectral workspace for this mode
  • _setup_mode_specific(): Initialize mode-specific components
  • _setup_output_fields(): Configure output fields dictionary
  • _compute_noise(): Generate/process noise for this time step
  • _compute_rhs(): Compute the right-hand side of the equation
  • _save_diagnostics(): Compute and save mode-specific diagnostics

Attributes:

Name Type Description
input

Input configuration object.

output

Output handler for NetCDF writing.

nx

Number of grid points.

dx

Grid spacing.

visc

Kinematic viscosity.

noise_amp

Noise amplitude.

cfl_target

Target CFL number.

max_step

Maximum allowed time step.

t_duration

Total simulation time.

t_save

Output save interval in physical time.

t_print

Progress print interval in physical time.

Initialize the Burgers solver.

Parameters:

Name Type Description Default
input_obj Input

Input configuration containing simulation parameters.

required
output_obj Output

Output handler for writing results to NetCDF.

required
Source code in pyburgers/core.py
def __init__(self, input_obj: Input, output_obj: Output) -> None:
    """Initialize the Burgers solver.

    Args:
        input_obj: Input configuration containing simulation parameters.
        output_obj: Output handler for writing results to NetCDF.
    """
    self.logger: logging.Logger = get_logger(self.mode_name)
    self.logger.info("You are running in %s mode", self.mode_name)

    # Initialize random number generator for reproducibility
    np.random.seed(1)

    # Store input/output objects
    self.logger.debug("Reading input settings")
    self.input = input_obj
    self.output = output_obj

    # Extract common configuration
    self.visc = input_obj.viscosity
    self.noise_amp = input_obj.physics.noise.amplitude
    self.noise_beta = input_obj.physics.noise.exponent
    self.fftw_planning = input_obj.fftw_planning
    self.fftw_threads = input_obj.fftw_threads
    self.domain_length = input_obj.domain_length

    # Adaptive time stepping parameters
    self.cfl_target = input_obj.cfl_target
    self.max_step = input_obj.max_step
    self.t_duration = input_obj.time.duration
    self.t_save = input_obj.t_save
    self.t_print = input_obj.t_print

    # Get mode-specific grid resolution
    self.nx = self._get_nx()
    self.mp = self.nx // 2
    self.dx = self.domain_length / self.nx

    # Precompute viscous stability limit (constant for the run)
    self._dt_visc = 0.2 * self.dx**2 / self.visc

    # Hyperviscosity: auto-compute coefficient as dx⁴ when enabled
    # This scaling ensures consistent damping across resolutions
    # and negligible impact on the timestep
    if input_obj.hyperviscosity_enabled:
        self.hypervisc = self.dx**4
        self._dt_hypervisc = 0.1 * self.dx**4 / self.hypervisc
        self.logger.info(
            "Hyperviscosity enabled: coefficient = %.2e",
            self.hypervisc,
        )
    else:
        self.hypervisc = 0.0
        self._dt_hypervisc = float("inf")

    # Create spectral workspace (bundles Derivatives, Dealias, Filter)
    self.spectral = self._create_spectral_workspace()

    # Grid coordinates
    self.x = np.arange(0, self.domain_length, self.dx)

    # Reference workspace buffers (zero-copy)
    self.u = self.spectral.u
    self.fu = self.spectral.fu

    # Initialize velocity field to zero
    self.u[:] = 0

    # Common output field
    self.tke = np.zeros(1)

    # Mode-specific setup (noise, SGS, etc.)
    self._setup_mode_specific()

    # Setup output
    self.output_dims = {"t": 0, "x": self.nx}
    self.output.set_dims(self.output_dims)

    self.output_fields = self._setup_output_fields()
    self.output.set_fields(self.output_fields)

    # Write initial data
    self.output.save(self.output_fields, 0, 0, initial=True)

run

run() -> None

Execute the time integration loop.

Advances the simulation using Williamson (1980) low-storage RK3 with CFL-based adaptive time stepping. Output is written at exact multiples of t_save by clamping dt to hit output times.

Source code in pyburgers/core.py
def run(self) -> None:
    """Execute the time integration loop.

    Advances the simulation using Williamson (1980) low-storage RK3
    with CFL-based adaptive time stepping. Output is written at
    exact multiples of t_save by clamping dt to hit output times.
    """
    # Williamson (1980) low-storage RK3 coefficients
    A = (0.0, -5.0 / 9.0, -153.0 / 128.0)
    B = (1.0 / 3.0, 15.0 / 16.0, 8.0 / 15.0)

    t_current = 0.0
    t_next_save = self.t_save
    t_next_print = self.t_print
    save_idx = 0
    Q = np.zeros_like(self.u)

    # Sample noise at fixed max_step intervals so that DNS and LES
    # consume the same random sequence regardless of adaptive dt.
    noise = self._compute_noise()
    t_next_noise = self.max_step

    while t_current < self.t_duration - 1e-14:
        dt = self._compute_dt()

        # Clamp to hit next output time or end time exactly
        if t_current + dt >= t_next_save - 1e-14:
            dt = t_next_save - t_current
        if t_current + dt > self.t_duration:
            dt = self.t_duration - t_current
        if dt < 1e-15:
            break

        is_output_step = abs(t_current + dt - t_next_save) < 1e-14

        # 3-stage RK3
        Q[:] = 0.0
        for stage in range(3):
            derivatives = self._compute_derivatives(False)
            rhs = self._compute_rhs(derivatives, noise, dt)
            Q[:] = A[stage] * Q + rhs
            self.u[:] = self.u + B[stage] * dt * Q

            # Zero Nyquist after each stage
            self.spectral.derivatives.fft()
            self.fu[self.mp] = 0
            self.spectral.derivatives.ifft_nyquist()

        t_current += dt

        # Refresh noise at fixed intervals
        if t_current >= t_next_noise - 1e-14:
            noise = self._compute_noise()
            t_next_noise += self.max_step

        # Progress logging
        self._log_progress(t_current, t_next_print)
        if t_current >= t_next_print - 1e-14:
            t_next_print += self.t_print

        # Output at exact save times
        if is_output_step:
            save_idx += 1
            derivatives = self._compute_derivatives(True)
            t_exact = save_idx * self.t_save
            self._save_diagnostics(derivatives, save_idx, t_exact)
            t_next_save += self.t_save

    if self.logger.isEnabledFor(logging.INFO):
        print()

DNS

DNS(input_obj: Input, output_obj: Output)

Bases: Burgers

Direct numerical simulation solver for the Burgers equation.

Solves the 1D stochastic Burgers equation at full resolution using Fourier collocation for spatial derivatives and RK3 time integration.

This class inherits common functionality from Burgers and implements DNS-specific behavior for noise generation and diagnostics. Uses a SpectralWorkspace with Derivatives and Dealias utilities (no Filter needed for DNS).

Initialize the DNS solver.

Parameters:

Name Type Description Default
input_obj Input

Input configuration containing simulation parameters.

required
output_obj Output

Output handler for writing results to NetCDF.

required
Source code in pyburgers/dns.py
def __init__(self, input_obj: Input, output_obj: Output) -> None:
    """Initialize the DNS solver.

    Args:
        input_obj: Input configuration containing simulation parameters.
        output_obj: Output handler for writing results to NetCDF.
    """
    super().__init__(input_obj, output_obj)

LES

LES(input_obj: Input, output_obj: Output)

Bases: Burgers

Large-eddy simulation solver for the Burgers equation.

Solves the filtered 1D stochastic Burgers equation using Fourier collocation for spatial derivatives, RK3 time integration, and subgrid-scale models for unresolved turbulence.

This class inherits common functionality from Burgers and implements LES-specific behavior including SGS modeling and filtered noise.

Attributes:

Name Type Description
nx_dns

Number of DNS grid points (for noise generation).

sgs_model_id

SGS model type identifier (0-4).

spectral

SpectralWorkspace with derivatives, dealias, and filter.

subgrid

SGS model instance.

tke_sgs

Subgrid TKE (for Deardorff model).

Initialize the LES solver.

Parameters:

Name Type Description Default
input_obj Input

Input configuration containing simulation parameters.

required
output_obj Output

Output handler for writing results to NetCDF.

required
Source code in pyburgers/les.py
def __init__(self, input_obj: Input, output_obj: Output) -> None:
    """Initialize the LES solver.

    Args:
        input_obj: Input configuration containing simulation parameters.
        output_obj: Output handler for writing results to NetCDF.
    """
    # Store LES-specific config before calling parent __init__
    # (needed because _create_spectral_workspace is called during parent init)
    self.nx_dns = input_obj.grid.dns.points
    self.sgs_model_id = input_obj.physics.subgrid_model

    super().__init__(input_obj, output_obj)

Data Models

data_models

Core Data Models for PyBurgers.

This module defines the core data structures used throughout PyBurgers. These structures are implemented as Python dataclasses to provide a clear and robust way to manage the model's state and configuration.

DNSConfig dataclass

DNSConfig(points: int)

Direct numerical simulation configuration.

Attributes:

Name Type Description
points int

Number of grid points.

FFTWConfig dataclass

FFTWConfig(planning: str, threads: int)

FFTW parameters.

Attributes:

Name Type Description
planning str

FFTW planning approach.

threads int

Number of threads to use.

LESConfig dataclass

LESConfig(points: int)

Large-eddy simulation configuration.

Attributes:

Name Type Description
points int

Number of grid points.

GridConfig dataclass

GridConfig(length: float, dns: DNSConfig, les: LESConfig)

Grid configurations.

Attributes:

Name Type Description
length float

Domain length (periodic) [m].

dns DNSConfig

Direct numerical simulation grid configuration.

les LESConfig

Large-eddy simulation grid configuration.

LoggingConfig dataclass

LoggingConfig(level: str, file: str | None = None)

Logging settings.

Attributes:

Name Type Description
level str

Logging level for the simulation (e.g., 'info', 'debug').

file str | None

Optional log file path for file logging.

HyperviscosityConfig dataclass

HyperviscosityConfig(enabled: bool = False)

Hyperviscosity parameters for high-k damping.

Adds a -ν₄∇⁴u term that provides k⁴ dissipation at high wavenumbers to prevent spectral pile-up near the Nyquist frequency.

When enabled, the coefficient is auto-computed as ν₄ = dx⁴ to provide appropriate damping that scales correctly with grid resolution and does not limit the simulation timestep.

Attributes:

Name Type Description
enabled bool

Whether hyperviscosity is enabled.

NoiseConfig dataclass

NoiseConfig(exponent: float, amplitude: float)

Noise method parameters.

Attributes:

Name Type Description
exponent float

FBM exponent controlling the spectral slope.

amplitude float

Noise amplitude.

OutputConfig dataclass

OutputConfig(interval_save: float, interval_print: float)

Output file configuration.

Attributes:

Name Type Description
interval_save float

Save interval in physical time [s].

interval_print float

Print progress interval in physical time [s].

PhysicsConfig dataclass

PhysicsConfig(noise: NoiseConfig, viscosity: float, subgrid_model: int, hyperviscosity: HyperviscosityConfig = HyperviscosityConfig())

Physics configuration.

Attributes:

Name Type Description
noise NoiseConfig

NoiseConfig configuration.

viscosity float

The fluid's kinematic viscosity [m^2/s].

subgrid_model int

Subgrid-scale model ID (0-4) for LES.

hyperviscosity HyperviscosityConfig

HyperviscosityConfig for high-k damping.

TimeConfig dataclass

TimeConfig(duration: float, cfl: float, max_step: float)

Time-related parameters for the simulation.

Attributes:

Name Type Description
duration float

Total simulation time [s].

cfl float

Target CFL number for adaptive time stepping.

max_step float

Maximum allowed time step [s].

Exceptions

exceptions

Custom exception types for PyBurgers.

PyBurgersError

Bases: Exception

Base class for all custom exceptions in PyBurgers.

NamelistError

Bases: PyBurgersError

Raised for errors found in the namelist configuration.

InvalidMode

Bases: PyBurgersError

Raised when an invalid simulation mode is specified.

Utilities

Derivatives

Derivatives(nx: int, dx: float, fftw_planning: str = 'FFTW_MEASURE', fftw_threads: int = 1)

Computes spectral derivatives using real FFT (rfft/irfft).

Uses Fourier collocation to compute spatial derivatives of a field. Supports first, second, and third order derivatives, as well as the dealiased derivative of the squared field (d(u^2)/dx).

Since velocity fields are real-valued, rfft is used for efficiency (~2x speedup, ~50% memory reduction for frequency arrays).

Attributes:

Name Type Description
nx

Number of grid points.

dx

Grid spacing.

nk

Number of rfft output coefficients (nx//2 + 1).

m

Nyquist mode index (nx/2).

fac

Derivative scaling factor (2pi/(nxdx)).

k

Wavenumber array (non-negative frequencies only).

Initialize the Derivatives calculator.

Parameters:

Name Type Description Default
nx int

Number of grid points (must be even).

required
dx float

Grid spacing.

required
fftw_planning str

FFTW planning strategy.

'FFTW_MEASURE'
fftw_threads int

Number of threads for FFTW.

1
Source code in pyburgers/utils/spectral.py
def __init__(
    self, nx: int, dx: float, fftw_planning: str = "FFTW_MEASURE", fftw_threads: int = 1
) -> None:
    """Initialize the Derivatives calculator.

    Args:
        nx: Number of grid points (must be even).
        dx: Grid spacing.
        fftw_planning: FFTW planning strategy.
        fftw_threads: Number of threads for FFTW.
    """
    self.nx = nx
    self.dx = dx

    # computed values
    self.nk = self.nx // 2 + 1  # rfft output size
    self.m = self.nx // 2  # Nyquist index
    self.fac = 2 * np.pi / (self.nx * self.dx)

    # wavenumber array for rfft (non-negative frequencies only)
    self.k = np.fft.rfftfreq(self.nx, d=1 / self.nx)
    self.k[self.nk - 1] = 0  # Zero Nyquist mode (last element)

    # Precompute powers for efficiency
    self.fac2 = self.fac**2
    self.fac3 = self.fac**3
    self.fac4 = self.fac**4
    self.k2 = self.k * self.k
    self.k3 = self.k**3
    self.k4 = self.k**4

    # pyfftw arrays for real FFT
    # Physical space: float64, Frequency space: complex128
    self.u = pyfftw.empty_aligned(nx, np.float64)
    self.fu = pyfftw.empty_aligned(self.nk, np.complex128)
    self.fun = pyfftw.empty_aligned(self.nk, np.complex128)
    self.der = pyfftw.empty_aligned(nx, np.float64)

    # Pre-allocated output arrays for derivatives (reused across calls)
    self._out_1 = pyfftw.empty_aligned(nx, np.float64)
    self._out_2 = pyfftw.empty_aligned(nx, np.float64)
    self._out_3 = pyfftw.empty_aligned(nx, np.float64)
    self._out_4 = pyfftw.empty_aligned(nx, np.float64)
    self._out_sq = pyfftw.empty_aligned(nx, np.float64)

    # padded pyfftw arrays for 2x dealiasing
    nx_padded = 2 * self.nx
    nk_padded = nx_padded // 2 + 1  # = nx + 1
    self.up = pyfftw.empty_aligned(nx_padded, np.float64)
    self.fup = pyfftw.empty_aligned(nk_padded, np.complex128)

    # pyfftw functions (auto-detects real<->complex from dtypes)
    self.fft = pyfftw.FFTW(
        self.u, self.fu, direction="FFTW_FORWARD", flags=(fftw_planning,), threads=fftw_threads
    )

    self.ifft = pyfftw.FFTW(
        self.fun,
        self.der,
        direction="FFTW_BACKWARD",
        flags=(fftw_planning,),
        threads=fftw_threads,
    )

    # Inverse FFT for Nyquist zeroing: fu -> u
    # Used by core.py to zero the Nyquist mode and transform back
    self.ifft_nyquist = pyfftw.FFTW(
        self.fu, self.u, direction="FFTW_BACKWARD", flags=(fftw_planning,), threads=fftw_threads
    )

    self.fftp = pyfftw.FFTW(
        self.up,
        self.fup,
        direction="FFTW_FORWARD",
        flags=(fftw_planning,),
        threads=fftw_threads,
    )

    self.ifftp = pyfftw.FFTW(
        self.fup,
        self.up,
        direction="FFTW_BACKWARD",
        flags=(fftw_planning,),
        threads=fftw_threads,
    )

compute

compute(u: ndarray, order: list[int | str]) -> dict[str, np.ndarray]

Compute spectral derivatives of the input field.

Parameters:

Name Type Description Default
u ndarray

Input velocity field array (real-valued).

required
order list[int | str]

List of derivative orders to compute. Can include integers (1, 2, 3) for standard derivatives or 'sq' for the dealiased derivative of u^2.

required

Returns:

Type Description
dict[str, ndarray]

Dictionary mapping order keys ('1', '2', '3', 'sq') to

dict[str, ndarray]

the corresponding derivative arrays. Arrays are reused

dict[str, ndarray]

internally, so callers should consume values before the

dict[str, ndarray]

next compute() call.

Source code in pyburgers/utils/spectral.py
def compute(self, u: np.ndarray, order: list[int | str]) -> dict[str, np.ndarray]:
    """Compute spectral derivatives of the input field.

    Args:
        u: Input velocity field array (real-valued).
        order: List of derivative orders to compute. Can include
            integers (1, 2, 3) for standard derivatives or 'sq'
            for the dealiased derivative of u^2.

    Returns:
        Dictionary mapping order keys ('1', '2', '3', 'sq') to
        the corresponding derivative arrays. Arrays are reused
        internally, so callers should consume values before the
        next compute() call.
    """
    derivatives = {}

    # copy input array
    self.u[:] = u

    # compute rfft
    self.fft()

    # Only save fu if both "sq" and 4 are requested (sq overwrites fu)
    needs_fu_copy = "sq" in order and 4 in order
    fu_original = self.fu.copy() if needs_fu_copy else self.fu

    # loop through order of derivative from user
    for key in order:
        if key == 1:
            self.fun[:] = 1j * self.k * self.fu
            self.ifft()
            np.multiply(self.fac, self.der, out=self._out_1)
            derivatives["1"] = self._out_1
        if key == 2:
            self.fun[:] = -self.k2 * self.fu
            self.ifft()
            np.multiply(self.fac2, self.der, out=self._out_2)
            derivatives["2"] = self._out_2
        if key == 3:
            self.fun[:] = -1j * self.k3 * self.fu
            self.ifft()
            np.multiply(self.fac3, self.der, out=self._out_3)
            derivatives["3"] = self._out_3
        if key == 4:
            # Use fu_original since "sq" overwrites self.fu
            self.fun[:] = self.k4 * fu_original
            self.ifft()
            np.multiply(self.fac4, self.der, out=self._out_4)
            derivatives["4"] = self._out_4
        if key == "sq":
            # Dealiased computation of d(u^2)/dx using 2x zero-padding
            # With rfft, only non-negative frequencies are stored
            # Zero-pad: copy all nk values to padded array (nk_padded = nx + 1)
            self.fup[:] = 0
            self.fup[0 : self.nk] = self.fu
            # Transform to padded physical space
            self.ifftp()
            # Correct for 2x padding normalization: irfft divides by 2n instead of n
            self.up[:] *= 2
            # Square in physical space
            self.up[:] = self.up**2
            # Transform back to spectral space
            self.fftp()
            # Extract non-aliased modes and correct for 2x array size
            self.fu[:] = self.fup[0 : self.nk] / 2
            self.fu[self.nk - 1] = 0  # Zero Nyquist
            # Compute derivative
            self.fun[:] = 1j * self.k * self.fu
            self.ifft()
            np.multiply(self.fac, self.der, out=self._out_sq)
            derivatives["sq"] = self._out_sq

    return derivatives

Dealias

Dealias(nx: int, fftw_planning: str = 'FFTW_MEASURE', fftw_threads: int = 1)

Dealiases nonlinear products using the 3/2 rule.

Computes |x| * x with proper dealiasing to avoid aliasing errors in the nonlinear term of the Burgers equation.

Uses real FFT (rfft/irfft) for efficiency since input fields are real-valued.

Attributes:

Name Type Description
nx

Number of grid points.

nk

Number of rfft output coefficients (nx//2 + 1).

m

Nyquist mode index (nx/2).

Initialize the Dealias calculator.

Parameters:

Name Type Description Default
nx int

Number of grid points.

required
fftw_planning str

FFTW planning strategy.

'FFTW_MEASURE'
fftw_threads int

Number of threads for FFTW.

1
Source code in pyburgers/utils/spectral.py
def __init__(self, nx: int, fftw_planning: str = "FFTW_MEASURE", fftw_threads: int = 1) -> None:
    """Initialize the Dealias calculator.

    Args:
        nx: Number of grid points.
        fftw_planning: FFTW planning strategy.
        fftw_threads: Number of threads for FFTW.
    """
    self.nx = nx
    self.m = self.nx // 2
    self.nk = self.nx // 2 + 1  # rfft output size

    # 3/2 rule padding sizes
    nx_padded = 3 * self.m  # = 3/2 * nx
    nk_padded = nx_padded // 2 + 1

    # pyfftw arrays for real FFT
    self.x = pyfftw.empty_aligned(self.nx, np.float64)
    self.fx = pyfftw.empty_aligned(self.nk, np.complex128)

    # padded pyfftw arrays
    self.xp = pyfftw.empty_aligned(nx_padded, np.float64)
    self.temp = pyfftw.empty_aligned(nx_padded, np.float64)
    self.fxp = pyfftw.empty_aligned(nk_padded, np.complex128)

    # pyfftw functions (auto-detects real<->complex from dtypes)
    self.fft = pyfftw.FFTW(
        self.x, self.fx, direction="FFTW_FORWARD", flags=(fftw_planning,), threads=fftw_threads
    )

    self.ifft = pyfftw.FFTW(
        self.fx, self.x, direction="FFTW_BACKWARD", flags=(fftw_planning,), threads=fftw_threads
    )

    self.fftp = pyfftw.FFTW(
        self.xp,
        self.fxp,
        direction="FFTW_FORWARD",
        flags=(fftw_planning,),
        threads=fftw_threads,
    )

    self.ifftp = pyfftw.FFTW(
        self.fxp,
        self.xp,
        direction="FFTW_BACKWARD",
        flags=(fftw_planning,),
        threads=fftw_threads,
    )

compute

compute(x: ndarray) -> np.ndarray

Compute the dealiased product |x| * x.

Parameters:

Name Type Description Default
x ndarray

Input array (real-valued).

required

Returns:

Type Description
ndarray

Dealiased result of |x| * x.

Source code in pyburgers/utils/spectral.py
def compute(self, x: np.ndarray) -> np.ndarray:
    """Compute the dealiased product |x| * x.

    Args:
        x: Input array (real-valued).

    Returns:
        Dealiased result of |x| * x.
    """
    # constants
    scale = c.spectral.DEALIAS_SCALE

    # copy input array
    self.x[:] = x

    # compute rfft of x
    self.fft()

    # zero-pad fx (simpler with rfft - just copy to beginning)
    self.fxp[:] = 0
    self.fxp[0 : self.nk] = self.fx

    # compute irfft of fxp
    self.ifftp()

    # store xp in temp
    self.temp[:] = self.xp

    # change x to abs(x)
    self.x[:] = np.abs(x)

    # compute rfft of x
    self.fft()

    # zero-pad fx
    self.fxp[:] = 0
    self.fxp[0 : self.nk] = self.fx

    # compute irfft of fxp
    self.ifftp()

    # multiply xp[x] with xp[abs(x)]
    self.xp[:] = self.xp * self.temp

    # compute rfft of xp
    self.fftp()

    # de-alias fxp (simpler with rfft - just take first nk values)
    self.fx[:] = self.fxp[0 : self.nk]

    # compute irfft of fx
    self.ifft()

    # return de-aliased input
    return scale * self.x.copy()

Filter

Filter(nx: int, nx2: int | None = None, fftw_planning: str = 'FFTW_MEASURE', fftw_threads: int = 1)

Spectral filtering operations.

Provides spectral cutoff filtering and downscaling from DNS to LES resolution using Fourier methods.

Uses real FFT (rfft/irfft) for efficiency since input fields are real-valued.

Attributes:

Name Type Description
nx

Number of grid points for the filtered field.

nk

Number of rfft output coefficients (nx//2 + 1).

nx2

Number of grid points for the source field (DNS resolution).

Initialize the Filter.

Parameters:

Name Type Description Default
nx int

Number of grid points for the target (filtered) field.

required
nx2 int | None

Optional number of grid points for source field (used for downscaling from DNS to LES).

None
fftw_planning str

FFTW planning strategy.

'FFTW_MEASURE'
fftw_threads int

Number of threads for FFTW.

1
Source code in pyburgers/utils/spectral.py
def __init__(
    self,
    nx: int,
    nx2: int | None = None,
    fftw_planning: str = "FFTW_MEASURE",
    fftw_threads: int = 1,
) -> None:
    """Initialize the Filter.

    Args:
        nx: Number of grid points for the target (filtered) field.
        nx2: Optional number of grid points for source field
            (used for downscaling from DNS to LES).
        fftw_planning: FFTW planning strategy.
        fftw_threads: Number of threads for FFTW.
    """
    self.nx = nx
    self.nk = self.nx // 2 + 1  # rfft output size

    # pyfftw arrays for real FFT
    self.x = pyfftw.empty_aligned(self.nx, np.float64)
    self.fx = pyfftw.empty_aligned(self.nk, np.complex128)
    self.fxf = pyfftw.zeros_aligned(self.nk, np.complex128)

    # pyfftw functions (auto-detects real<->complex from dtypes)
    self.fft = pyfftw.FFTW(
        self.x, self.fx, direction="FFTW_FORWARD", flags=(fftw_planning,), threads=fftw_threads
    )

    self.ifft = pyfftw.FFTW(
        self.fxf,
        self.x,
        direction="FFTW_BACKWARD",
        flags=(fftw_planning,),
        threads=fftw_threads,
    )

    # check for optional larger nx (for downscaling from DNS->LES)
    if nx2:
        self.nx2 = nx2
        self.nk2 = self.nx2 // 2 + 1

        # pyfftw arrays for larger grid
        self.x2 = pyfftw.empty_aligned(self.nx2, np.float64)
        self.fx2 = pyfftw.empty_aligned(self.nk2, np.complex128)

        # pyfftw function for larger grid
        self.fft2 = pyfftw.FFTW(
            self.x2,
            self.fx2,
            direction="FFTW_FORWARD",
            flags=(fftw_planning,),
            threads=fftw_threads,
        )

cutoff

cutoff(x: ndarray, ratio: int) -> np.ndarray

Apply a spectral cutoff filter.

Removes high-frequency modes above nx/ratio.

Parameters:

Name Type Description Default
x ndarray

Input array to filter (real-valued).

required
ratio int

Cutoff ratio (keeps modes up to nx/ratio).

required

Returns:

Type Description
ndarray

Filtered array with high frequencies removed.

Source code in pyburgers/utils/spectral.py
def cutoff(self, x: np.ndarray, ratio: int) -> np.ndarray:
    """Apply a spectral cutoff filter.

    Removes high-frequency modes above nx/ratio.

    Args:
        x: Input array to filter (real-valued).
        ratio: Cutoff ratio (keeps modes up to nx/ratio).

    Returns:
        Filtered array with high frequencies removed.
    """
    # signal size information
    m = int(self.nx / ratio)
    half = int(m / 2)

    # copy input array
    self.x[:] = x

    # compute rfft of x
    self.fft()

    # filter fx (keep low frequencies only)
    # With rfft, only non-negative frequencies exist
    self.fxf[:] = 0
    self.fxf[0:half] = self.fx[0:half]

    # compute irfft of fxf
    self.ifft()

    # return filtered x
    return self.x.copy()

downscale

downscale(x: ndarray, ratio: int) -> np.ndarray

Downscale a field from DNS to LES resolution.

Uses Fourier filtering to project a high-resolution field onto a coarser grid while preserving low-frequency content.

Parameters:

Name Type Description Default
x ndarray

Input array at DNS resolution (real-valued).

required
ratio int

Downscaling ratio (nx2 / nx).

required

Returns:

Type Description
ndarray

Downscaled array at LES resolution.

Source code in pyburgers/utils/spectral.py
def downscale(self, x: np.ndarray, ratio: int) -> np.ndarray:
    """Downscale a field from DNS to LES resolution.

    Uses Fourier filtering to project a high-resolution field
    onto a coarser grid while preserving low-frequency content.

    Args:
        x: Input array at DNS resolution (real-valued).
        ratio: Downscaling ratio (nx2 / nx).

    Returns:
        Downscaled array at LES resolution.
    """
    # zero output array to prevent stale data
    self.fxf[:] = 0

    # copy input array
    self.x2[:] = x

    # signal shape information - keep up to (but not including) LES Nyquist
    half = self.nx // 2

    # compute rfft of larger series
    self.fft2()

    # filter - transfer low frequencies to smaller array
    # With rfft, only non-negative frequencies exist
    self.fxf[0:half] = self.fx2[0:half]
    self.fxf[half] = 0  # Zero Nyquist

    # compute the irfft
    self.ifft()

    # return filtered downscaled field
    # Scale by 1/ratio to preserve amplitude when downscaling
    return (1 / ratio) * self.x.copy()

SpectralWorkspace

SpectralWorkspace(nx: int, dx: float, nx2: int | None = None, noise_beta: float | None = None, noise_nx: int | None = None, fftw_planning: str = 'FFTW_MEASURE', fftw_threads: int = 1)

Centralized workspace for all spectral operations.

Manages FFT plans and aligned buffers for a simulation by bundling Derivatives, Dealias, Filter, and FBM noise utilities into a single workspace. All utilities share consistent FFTW settings (planning strategy, threads).

This design eliminates redundant FFT plan creation and ensures that resources are shared efficiently across the simulation.

Attributes:

Name Type Description
derivatives

Derivatives calculator for spatial derivatives.

dealias

Dealias calculator for nonlinear products.

filter

Filter for spectral cutoff and downscaling.

noise

FBM noise generator (only if noise_beta provided).

u ndarray

Primary velocity buffer (float64, size nx).

fu ndarray

Primary Fourier space buffer (complex128, size nx//2+1 for rfft).

Example

DNS workspace with noise

workspace = SpectralWorkspace(nx=512, dx=0.01, noise_beta=-0.75) derivs = workspace.derivatives.compute(u, order=[1, 2]) noise = workspace.noise.compute_noise()

LES workspace with filtering and noise at DNS resolution

workspace_les = SpectralWorkspace( ... nx=512, dx=0.01, nx2=8192, noise_beta=-0.75, noise_nx=8192 ... ) filtered = workspace_les.filter.cutoff(x, ratio=2)

Initialize the spectral workspace.

Parameters:

Name Type Description Default
nx int

Number of grid points for the simulation.

required
dx float

Grid spacing.

required
nx2 int | None

Optional number of grid points for DNS resolution (used in LES mode for downscaling noise from DNS to LES grid). If provided, creates Filter with downscaling capability.

None
noise_beta float | None

Optional FBM exponent for noise generation. If provided, creates FBM noise generator. Typical value is 0.75.

None
noise_nx int | None

Optional number of grid points for noise generation. If not provided and noise_beta is given, uses nx. For LES, set this to nx_dns to generate noise at DNS resolution.

None
fftw_planning str

FFTW planning strategy. Options: - 'FFTW_ESTIMATE': Fast planning, slower execution - 'FFTW_MEASURE': Balanced (default) - 'FFTW_PATIENT': Slow planning, faster execution

'FFTW_MEASURE'
fftw_threads int

Number of threads for FFTW operations.

1
Source code in pyburgers/utils/spectral_workspace.py
def __init__(
    self,
    nx: int,
    dx: float,
    nx2: int | None = None,
    noise_beta: float | None = None,
    noise_nx: int | None = None,
    fftw_planning: str = "FFTW_MEASURE",
    fftw_threads: int = 1,
) -> None:
    """Initialize the spectral workspace.

    Args:
        nx: Number of grid points for the simulation.
        dx: Grid spacing.
        nx2: Optional number of grid points for DNS resolution
            (used in LES mode for downscaling noise from DNS to LES grid).
            If provided, creates Filter with downscaling capability.
        noise_beta: Optional FBM exponent for noise generation.
            If provided, creates FBM noise generator. Typical value is 0.75.
        noise_nx: Optional number of grid points for noise generation.
            If not provided and noise_beta is given, uses nx.
            For LES, set this to nx_dns to generate noise at DNS resolution.
        fftw_planning: FFTW planning strategy. Options:
            - 'FFTW_ESTIMATE': Fast planning, slower execution
            - 'FFTW_MEASURE': Balanced (default)
            - 'FFTW_PATIENT': Slow planning, faster execution
        fftw_threads: Number of threads for FFTW operations.
    """
    # Store configuration
    self.nx = nx
    self.dx = dx
    self.nx2 = nx2
    self.noise_beta = noise_beta
    self.noise_nx = noise_nx if noise_nx is not None else nx
    self.fftw_planning = fftw_planning
    self.fftw_threads = fftw_threads

    # Initialize all spectral utilities with consistent settings
    self.derivatives = Derivatives(
        nx=nx, dx=dx, fftw_planning=fftw_planning, fftw_threads=fftw_threads
    )

    self.dealias = Dealias(nx=nx, fftw_planning=fftw_planning, fftw_threads=fftw_threads)

    # Always create Filter (SGS models need it for test filtering)
    # If nx2 provided, Filter can also do downscaling from DNS to LES grid
    self.filter = Filter(
        nx=nx,
        nx2=nx2,  # Optional: None for basic filtering, set for downscaling
        fftw_planning=fftw_planning,
        fftw_threads=fftw_threads,
    )

    # Optionally create FBM noise generator
    # If noise_nx differs from nx (LES case), noise is generated at noise_nx
    # resolution and must be filtered down using self.filter.downscale()
    if noise_beta is not None:
        self.noise = FBM(
            beta=noise_beta,
            n_pts=self.noise_nx,
            fftw_planning=fftw_planning,
            fftw_threads=fftw_threads,
        )
    else:
        self.noise = None

    # Expose commonly used buffers for direct access
    # This allows code like: workspace.u[:] = initial_condition
    self.u: np.ndarray = self.derivatives.u
    self.fu: np.ndarray = self.derivatives.fu

__repr__

__repr__() -> str

String representation of the workspace.

Source code in pyburgers/utils/spectral_workspace.py
def __repr__(self) -> str:
    """String representation of the workspace."""
    filter_info = f", nx2={self.nx2}" if self.nx2 else ""
    noise_info = (
        f", noise_beta={self.noise_beta}, noise_nx={self.noise_nx}"
        if self.noise_beta
        else ""
    )
    return (
        f"SpectralWorkspace(nx={self.nx}, dx={self.dx}{filter_info}{noise_info}, "
        f"fftw_planning='{self.fftw_planning}', fftw_threads={self.fftw_threads})"
    )

fftw

FFTW wisdom management for PyBurgers.

This module handles loading and saving FFTW wisdom to disk, which allows FFT plans to be reused across runs for faster initialization.

The wisdom file is stored at ~/.pyburgers_fftw_wisdom.

File locking is used to prevent race conditions when multiple PyBurgers instances access the wisdom file concurrently.

load_wisdom

load_wisdom(nx_dns: int, nx_les: int, noise_beta: float, fftw_planning: str, fftw_threads: int) -> tuple[bool, str]

Load FFTW wisdom from cache file if parameters match.

FFTW wisdom contains optimized FFT plans from previous runs. Loading wisdom speeds up FFT initialization significantly.

This function validates that the cached wisdom was created with the same grid sizes and parameters. If parameters have changed, the wisdom is invalidated and False is returned to trigger re-warmup.

Uses shared file locking to allow concurrent reads while preventing read/write conflicts.

Parameters:

Name Type Description Default
nx_dns int

DNS grid resolution.

required
nx_les int

LES grid resolution.

required
noise_beta float

FBM noise exponent.

required
fftw_planning str

FFTW planning strategy.

required
fftw_threads int

Number of FFTW threads.

required

Returns:

Type Description
bool

Tuple of (success: bool, message: str) indicating whether wisdom

str

was loaded and a descriptive message about the outcome.

Source code in pyburgers/utils/fftw.py
def load_wisdom(
    nx_dns: int,
    nx_les: int,
    noise_beta: float,
    fftw_planning: str,
    fftw_threads: int,
) -> tuple[bool, str]:
    """Load FFTW wisdom from cache file if parameters match.

    FFTW wisdom contains optimized FFT plans from previous runs.
    Loading wisdom speeds up FFT initialization significantly.

    This function validates that the cached wisdom was created with
    the same grid sizes and parameters. If parameters have changed,
    the wisdom is invalidated and False is returned to trigger re-warmup.

    Uses shared file locking to allow concurrent reads while preventing
    read/write conflicts.

    Args:
        nx_dns: DNS grid resolution.
        nx_les: LES grid resolution.
        noise_beta: FBM noise exponent.
        fftw_planning: FFTW planning strategy.
        fftw_threads: Number of FFTW threads.

    Returns:
        Tuple of (success: bool, message: str) indicating whether wisdom
        was loaded and a descriptive message about the outcome.
    """
    if not WISDOM_FILE.exists():
        return False, "No wisdom file found"

    try:
        # Acquire shared lock for reading (multiple readers OK)
        with _file_lock(WISDOM_FILE, exclusive=False):
            with open(WISDOM_FILE, "rb") as f:
                data = pickle.load(f)

        # Handle legacy wisdom files (raw wisdom without metadata)
        if not isinstance(data, dict):
            return False, "Legacy wisdom format detected (no metadata)"

        # Extract wisdom and metadata
        wisdom = data.get("wisdom")
        metadata = data.get("metadata", {})

        # Check each parameter and build a detailed message
        mismatches = []
        if metadata.get("nx_dns") != nx_dns:
            mismatches.append(f"nx_dns ({metadata.get('nx_dns')}{nx_dns})")
        if metadata.get("nx_les") != nx_les:
            mismatches.append(f"nx_les ({metadata.get('nx_les')}{nx_les})")
        if metadata.get("noise_beta") != noise_beta:
            mismatches.append(f"noise_beta ({metadata.get('noise_beta')}{noise_beta})")
        if metadata.get("fftw_planning") != fftw_planning:
            mismatches.append(
                f"fftw_planning ({metadata.get('fftw_planning')}{fftw_planning})"
            )
        if metadata.get("fftw_threads") != fftw_threads:
            mismatches.append(f"fftw_threads ({metadata.get('fftw_threads')}{fftw_threads})")

        if mismatches:
            msg = "Parameter mismatch: " + ", ".join(mismatches)
            return False, msg

        # Import the validated wisdom
        pyfftw.import_wisdom(wisdom)
        return True, "Wisdom loaded successfully"

    except TimeoutError as e:
        return False, f"Lock timeout: {e}"
    except Exception as e:
        return False, f"Error loading wisdom: {e}"

save_wisdom

save_wisdom(nx_dns: int, nx_les: int, noise_beta: float, fftw_planning: str, fftw_threads: int) -> bool

Save FFTW wisdom with metadata to cache file.

Saves the accumulated FFT plans along with the grid sizes and parameters used to create them. This allows validation on load to ensure the cached plans match the current configuration.

Uses exclusive file locking to prevent concurrent writes and read/write conflicts.

Parameters:

Name Type Description Default
nx_dns int

DNS grid resolution.

required
nx_les int

LES grid resolution.

required
noise_beta float

FBM noise exponent.

required
fftw_planning str

FFTW planning strategy.

required
fftw_threads int

Number of FFTW threads.

required

Returns:

Type Description
bool

True if wisdom was saved successfully, False otherwise.

Source code in pyburgers/utils/fftw.py
def save_wisdom(
    nx_dns: int,
    nx_les: int,
    noise_beta: float,
    fftw_planning: str,
    fftw_threads: int,
) -> bool:
    """Save FFTW wisdom with metadata to cache file.

    Saves the accumulated FFT plans along with the grid sizes and
    parameters used to create them. This allows validation on load
    to ensure the cached plans match the current configuration.

    Uses exclusive file locking to prevent concurrent writes and
    read/write conflicts.

    Args:
        nx_dns: DNS grid resolution.
        nx_les: LES grid resolution.
        noise_beta: FBM noise exponent.
        fftw_planning: FFTW planning strategy.
        fftw_threads: Number of FFTW threads.

    Returns:
        True if wisdom was saved successfully, False otherwise.
    """
    try:
        # Package wisdom with metadata
        data = {
            "wisdom": pyfftw.export_wisdom(),
            "metadata": {
                "nx_dns": nx_dns,
                "nx_les": nx_les,
                "noise_beta": noise_beta,
                "fftw_planning": fftw_planning,
                "fftw_threads": fftw_threads,
            },
        }

        # Acquire exclusive lock for writing (blocks all other access)
        with _file_lock(WISDOM_FILE, exclusive=True):
            with open(WISDOM_FILE, "wb") as f:
                pickle.dump(data, f)
        return True
    except TimeoutError:
        # Could not acquire lock - another process is accessing file
        return False
    except Exception:
        return False

warmup_fftw_plans

warmup_fftw_plans(nx_dns: int, nx_les: int, noise_beta: float, fftw_planning: str, fftw_threads: int, domain_length: float = 2 * 3.141592653589793) -> tuple[bool, str]

Generate FFTW plans for common PyBurgers sizes.

This creates representative FFTW plans for DNS/LES grids, filters, dealiasing, and FBM noise so wisdom can be saved once and reused.

If warmup fails (e.g., out of memory, invalid parameters), the function returns False and a descriptive error message. The simulation can still continue - plans will be created on-demand during initialization.

Parameters:

Name Type Description Default
nx_dns int

DNS grid resolution.

required
nx_les int

LES grid resolution.

required
noise_beta float

FBM noise exponent.

required
fftw_planning str

FFTW planning strategy.

required
fftw_threads int

Number of FFTW threads.

required
domain_length float

Length of the periodic domain (default: 2π).

2 * 3.141592653589793

Returns:

Type Description
bool

Tuple of (success: bool, message: str) indicating whether warmup

str

completed successfully and any relevant diagnostic information.

Source code in pyburgers/utils/fftw.py
def warmup_fftw_plans(
    nx_dns: int,
    nx_les: int,
    noise_beta: float,
    fftw_planning: str,
    fftw_threads: int,
    domain_length: float = 2 * 3.141592653589793,
) -> tuple[bool, str]:
    """Generate FFTW plans for common PyBurgers sizes.

    This creates representative FFTW plans for DNS/LES grids, filters,
    dealiasing, and FBM noise so wisdom can be saved once and reused.

    If warmup fails (e.g., out of memory, invalid parameters), the function
    returns False and a descriptive error message. The simulation can still
    continue - plans will be created on-demand during initialization.

    Args:
        nx_dns: DNS grid resolution.
        nx_les: LES grid resolution.
        noise_beta: FBM noise exponent.
        fftw_planning: FFTW planning strategy.
        fftw_threads: Number of FFTW threads.
        domain_length: Length of the periodic domain (default: 2π).

    Returns:
        Tuple of (success: bool, message: str) indicating whether warmup
        completed successfully and any relevant diagnostic information.
    """
    from .spectral_workspace import SpectralWorkspace

    try:
        # Warm DNS components using SpectralWorkspace
        if nx_dns > 0:
            try:
                SpectralWorkspace(
                    nx=nx_dns,
                    dx=domain_length / nx_dns,
                    noise_beta=noise_beta,
                    noise_nx=nx_dns,
                    fftw_planning=fftw_planning,
                    fftw_threads=fftw_threads,
                )
            except Exception as e:
                raise RuntimeError(f"Failed to create DNS workspace (nx={nx_dns}): {e}") from e

        # Warm LES components using SpectralWorkspace
        if nx_les > 0:
            try:
                SpectralWorkspace(
                    nx=nx_les,
                    dx=domain_length / nx_les,
                    nx2=nx_dns,
                    noise_beta=noise_beta,
                    noise_nx=nx_dns,
                    fftw_planning=fftw_planning,
                    fftw_threads=fftw_threads,
                )
            except Exception as e:
                raise RuntimeError(
                    f"Failed to create LES workspace (nx={nx_les}, nx2={nx_dns}): {e}"
                ) from e

        return True, "FFTW plans created successfully"

    except Exception as e:
        # Warmup failed, but this is not fatal - plans will be created on demand
        return False, f"Warmup failed: {e}"

FBM

FBM(beta: float, n_pts: int, fftw_planning: str = 'FFTW_MEASURE', fftw_threads: int = 1)

Generates fractional Brownian motion (FBM) noise.

FBM noise is used as the stochastic forcing term in the Burgers equation. The noise has a power spectrum that scales as k^(-beta).

Attributes:

Name Type Description
beta

FBM exponent, controls spectral slope.

n_pts

Number of grid points.

fftw_planning

FFTW planning strategy.

fftw_threads

Number of FFTW threads.

nyquist

Nyquist mode index (n/2).

wavenumber

Wavenumber array for spectral coloring.

Initialize the FBM noise generator.

Parameters:

Name Type Description Default
beta float

FBM exponent controlling the spectral slope. Typical value is -0.75 for Burgers turbulence.

required
n_pts int

Number of grid points.

required
fftw_planning str

FFTW planning strategy (default: 'FFTW_MEASURE').

'FFTW_MEASURE'
fftw_threads int

Number of FFTW threads (default: 1).

1
Source code in pyburgers/utils/fbm.py
def __init__(
    self,
    beta: float,
    n_pts: int,
    fftw_planning: str = "FFTW_MEASURE",
    fftw_threads: int = 1,
) -> None:
    """Initialize the FBM noise generator.

    Args:
        beta: FBM exponent controlling the spectral slope. Typical value
            is -0.75 for Burgers turbulence.
        n_pts: Number of grid points.
        fftw_planning: FFTW planning strategy (default: 'FFTW_MEASURE').
        fftw_threads: Number of FFTW threads (default: 1).
    """
    self.beta = beta
    self.n_pts = n_pts
    self.fftw_planning = fftw_planning
    self.fftw_threads = fftw_threads

    # Computed values
    self.nyquist = int(0.5 * n_pts)
    self.nk = self.nyquist + 1
    wavenumber = np.fft.rfftfreq(n_pts, d=1 / n_pts)
    wavenumber[0] = 1  # Avoid /0; DC component is 0 in compute_noise()

    # Precompute spectral coloring coefficients (k^(beta/2))
    # This avoids redundant power computation in compute_noise()
    self._coloring = wavenumber ** (0.5 * beta)

    # pyfftw arrays (real <-> complex rfft/irfft)
    self.x = pyfftw.empty_aligned(n_pts, np.float64)
    self.fx = pyfftw.empty_aligned(self.nk, np.complex128)
    self.fxn = pyfftw.empty_aligned(self.nk, np.complex128)
    self.noise = pyfftw.empty_aligned(n_pts, np.float64)

    # pyfftw functions
    self.fft = pyfftw.FFTW(
        self.x,
        self.fx,
        direction="FFTW_FORWARD",
        flags=(self.fftw_planning,),
        threads=self.fftw_threads,
    )

    self.ifft = pyfftw.FFTW(
        self.fxn,
        self.noise,
        direction="FFTW_BACKWARD",
        flags=(self.fftw_planning,),
        threads=self.fftw_threads,
    )

compute_noise

compute_noise() -> NDArray[np.float64]

Generate a realization of FBM noise.

Creates white noise, transforms to spectral space, applies the FBM spectral coloring (k^(beta/2)), and transforms back.

Returns:

Type Description
NDArray[float64]

Real-valued noise array with FBM spectral characteristics.

Note

The returned array is an internal buffer that will be overwritten on the next call to compute_noise(). If you need to preserve the values, make a copy: noise.copy().

Source code in pyburgers/utils/fbm.py
def compute_noise(self) -> NDArray[np.float64]:
    """Generate a realization of FBM noise.

    Creates white noise, transforms to spectral space, applies
    the FBM spectral coloring (k^(beta/2)), and transforms back.

    Returns:
        Real-valued noise array with FBM spectral characteristics.

    Note:
        The returned array is an internal buffer that will be overwritten
        on the next call to compute_noise(). If you need to preserve the
        values, make a copy: ``noise.copy()``.
    """
    # Generate white noise input (faster than inverse CDF)
    self.x[:] = np.sqrt(self.n_pts) * np.random.standard_normal(self.n_pts)

    # Transform to spectral space
    self.fft()

    # Zero-out DC and Nyquist modes, apply precomputed spectral coloring
    self.fx[0] = 0
    self.fx[self.nyquist] = 0
    self.fxn[:] = self.fx * self._coloring

    # Transform back to physical space
    self.ifft()

    return self.noise

constants

Defines constants for PyBurgers.

This module provides a centralized location for all constants used throughout PyBurgers. Constants are grouped into logical namespaces using frozen dataclasses.

Attributes:

Name Type Description
spectral SpectralConstants

Spectral algorithm constants

sgs SGSConstants

Subgrid-scale model constants

spectral module-attribute

spectral = SpectralConstants()

sgs module-attribute

sgs = SGSConstants()

SpectralConstants dataclass

SpectralConstants(DEALIAS_SCALE: float = 3.0 / 2.0)

Spectral algorithm constants.

Attributes:

Name Type Description
DEALIAS_SCALE float

Scale factor for dealiasing using 3/2 padding rule

DEALIAS_SCALE class-attribute instance-attribute

DEALIAS_SCALE: float = 3.0 / 2.0

SGSConstants dataclass

SGSConstants(TEST_FILTER_RATIO: int = 2, SMAG_CONSTANT_CS: float = 0.16, DEARDORFF_CE: float = 0.7, DEARDORFF_C1: float = 0.1, WONGLILLY_EXPONENT: float = 4.0 / 3.0)

Subgrid-scale model constants.

Attributes:

Name Type Description
TEST_FILTER_RATIO int

Ratio for test filter width in dynamic models

SMAG_CONSTANT_CS float

Smagorinsky constant (Cs) for constant-coefficient model

DEARDORFF_CE float

Deardorff model constant for TKE dissipation

DEARDORFF_C1 float

Deardorff model constant for eddy viscosity

WONGLILLY_EXPONENT float

Exponent for Wong-Lilly dynamic procedure (4/3)

TEST_FILTER_RATIO class-attribute instance-attribute

TEST_FILTER_RATIO: int = 2

SMAG_CONSTANT_CS class-attribute instance-attribute

SMAG_CONSTANT_CS: float = 0.16

DEARDORFF_CE class-attribute instance-attribute

DEARDORFF_CE: float = 0.7

DEARDORFF_C1 class-attribute instance-attribute

DEARDORFF_C1: float = 0.1

WONGLILLY_EXPONENT class-attribute instance-attribute

WONGLILLY_EXPONENT: float = 4.0 / 3.0

logging_helper

Logging configuration for PyBurgers.

This module provides centralized logging setup and helper functions for consistent logging across all PyBurgers modules.

setup_logging

setup_logging(level: str | int = 'INFO', format_string: str | None = None, log_file: str | None = None, file_mode: str = 'w') -> None

Configure the root logger for PyBurgers.

This should be called once at application startup, typically in the main entry point (burgers.py).

Parameters:

Name Type Description Default
level str | int

Log level as string ("DEBUG", "INFO", etc.) or int.

'INFO'
format_string str | None

Optional custom format string. If None, uses DEBUG_FORMAT for DEBUG level, DEFAULT_FORMAT otherwise.

None
log_file str | None

Optional log file path for file logging.

None
file_mode str

File mode for log file handler (default: "w").

'w'
Source code in pyburgers/utils/logging_helper.py
def setup_logging(
    level: str | int = "INFO",
    format_string: str | None = None,
    log_file: str | None = None,
    file_mode: str = "w",
) -> None:
    """Configure the root logger for PyBurgers.

    This should be called once at application startup, typically in
    the main entry point (burgers.py).

    Args:
        level: Log level as string ("DEBUG", "INFO", etc.) or int.
        format_string: Optional custom format string. If None, uses
            DEBUG_FORMAT for DEBUG level, DEFAULT_FORMAT otherwise.
        log_file: Optional log file path for file logging.
        file_mode: File mode for log file handler (default: "w").
    """
    # Convert string level to int if needed
    if isinstance(level, str):
        level = getattr(logging, level.upper(), logging.INFO)

    # Configure root logger
    root_logger = logging.getLogger("PyBurgers")
    root_logger.setLevel(level)

    # Remove existing handlers to avoid duplicates
    root_logger.handlers.clear()

    # Create console handler
    handler = logging.StreamHandler(sys.stdout)
    handler.setLevel(level)
    handler.setFormatter(log_format)
    handler.addFilter(_SkipProgressFilter())
    root_logger.addHandler(handler)

    # Create progress handler (same format, overwrites current line)
    progress_handler = _ProgressHandler(sys.stdout)
    progress_handler.setLevel(level)
    progress_handler.setFormatter(log_format)
    progress_handler.addFilter(_ProgressOnlyFilter())
    root_logger.addHandler(progress_handler)

    if log_file:
        log_path = Path(log_file).expanduser()
        if log_path.parent != Path("."):
            log_path.parent.mkdir(parents=True, exist_ok=True)
        file_handler = logging.FileHandler(log_path, mode=file_mode, encoding="utf-8")
        file_handler.setLevel(level)
        file_handler.setFormatter(log_format)
        file_handler.addFilter(_SkipProgressFilter())
        root_logger.addHandler(file_handler)

    # Prevent propagation to root logger
    root_logger.propagate = False

get_logger

get_logger(name: str) -> logging.Logger

Get a logger for the specified module/class.

Creates a child logger under the PyBurgers namespace for consistent hierarchical logging.

Parameters:

Name Type Description Default
name str

Name for the logger, typically the class or module name. Examples: "DNS", "LES", "SGS", "Input", "Output"

required

Returns:

Type Description
Logger

Configured logger instance.

Example

logger = get_logger("DNS") logger.info("Starting simulation") [PyBurgers: DNS] Starting simulation

Source code in pyburgers/utils/logging_helper.py
def get_logger(name: str) -> logging.Logger:
    """Get a logger for the specified module/class.

    Creates a child logger under the PyBurgers namespace for consistent
    hierarchical logging.

    Args:
        name: Name for the logger, typically the class or module name.
            Examples: "DNS", "LES", "SGS", "Input", "Output"

    Returns:
        Configured logger instance.

    Example:
        >>> logger = get_logger("DNS")
        >>> logger.info("Starting simulation")
        [PyBurgers: DNS]     Starting simulation
    """
    full_name = f"PyBurgers.{name}"

    if full_name not in _loggers:
        _loggers[full_name] = logging.getLogger(full_name)

    return _loggers[full_name]

get_log_level

get_log_level(level_name: str) -> int

Convert a log level name to its integer value.

Parameters:

Name Type Description Default
level_name str

Case-insensitive level name ("debug", "INFO", etc.)

required

Returns:

Type Description
int

Integer log level value.

Raises:

Type Description
ValueError

If level_name is not a valid log level.

Source code in pyburgers/utils/logging_helper.py
def get_log_level(level_name: str) -> int:
    """Convert a log level name to its integer value.

    Args:
        level_name: Case-insensitive level name ("debug", "INFO", etc.)

    Returns:
        Integer log level value.

    Raises:
        ValueError: If level_name is not a valid log level.
    """
    level_name = level_name.upper()
    level = getattr(logging, level_name, None)

    if level is None:
        valid_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
        raise ValueError(
            f"Invalid log level: '{level_name}'. Valid options: {', '.join(valid_levels)}"
        )

    return level

Input/Output

Input

Input(namelist_path: str)

Orchestrates the loading and validation of all model inputs.

This class reads configuration from a JSON namelist file. All data is validated and organized into the appropriate dataclasses.

Attributes:

Name Type Description
time TimeConfig

Dataclass with time-related parameters (duration, cfl, max_step).

physics PhysicsConfig

Dataclass with physics parameters (noise, viscosity).

grid GridConfig

Dataclass with grid configuration (length, DNS, LES).

output OutputConfig

Dataclass with output file configuration.

logging LoggingConfig

Dataclass with logging settings.

fftw FFTWConfig

Dataclass with FFTW configuration.

Initialize the Input class and load all configuration.

Parameters:

Name Type Description Default
namelist_path str

The file path to the JSON namelist.

required

Raises:

Type Description
FileNotFoundError

If the namelist file does not exist.

JSONDecodeError

If the namelist JSON file is malformed.

NamelistError

If required configuration is missing or invalid.

Source code in pyburgers/utils/io/input.py
def __init__(self, namelist_path: str) -> None:
    """Initialize the Input class and load all configuration.

    Args:
        namelist_path: The file path to the JSON namelist.

    Raises:
        FileNotFoundError: If the namelist file does not exist.
        json.JSONDecodeError: If the namelist JSON file is malformed.
        NamelistError: If required configuration is missing or invalid.
    """
    # Set up basic logging before we can read the log level
    setup_logging(level="INFO")
    self.logger: logging.Logger = get_logger("Input")
    self.logger.info("Reading %s", namelist_path)

    namelist_data = self._load_namelist(namelist_path)
    self._validate_namelist(namelist_data)

    # Extract and finalize logging config first so we can adjust log level
    logging_data = namelist_data.get("logging", {})
    log_level = logging_data.get("level", "INFO")
    log_file = logging_data.get("file")
    if log_file == "":
        log_file = None
    self.logging: LoggingConfig = LoggingConfig(level=log_level, file=log_file)
    setup_logging(level=log_level, log_file=log_file)

    # Time configuration
    time_data = namelist_data["time"]
    duration = float(time_data["duration"])
    cfl = float(time_data["cfl"])
    max_step = float(time_data["max_step"])
    self.time: TimeConfig = TimeConfig(duration=duration, cfl=cfl, max_step=max_step)

    # Grid configuration (DNS and LES)
    grid_data = namelist_data["grid"]
    dns_data = grid_data.get("dns", {})
    les_data = grid_data.get("les", {})
    self.grid: GridConfig = GridConfig(
        length=float(grid_data.get("length", math.tau)),
        dns=DNSConfig(points=int(dns_data.get("points", 8192))),
        les=LESConfig(points=int(les_data.get("points", 512))),
    )

    # Physics configuration
    physics_data = namelist_data["physics"]
    noise_data = physics_data.get("noise", {})
    hypervisc_data = physics_data.get("hyperviscosity", {})
    self.physics: PhysicsConfig = PhysicsConfig(
        noise=NoiseConfig(
            exponent=float(noise_data.get("exponent", 0.75)),
            amplitude=float(noise_data.get("amplitude", 1e-6)),
        ),
        viscosity=float(physics_data["viscosity"]),
        subgrid_model=int(physics_data.get("subgrid_model", 1)),
        hyperviscosity=HyperviscosityConfig(
            enabled=bool(hypervisc_data.get("enabled", False)),
        ),
    )

    # Output configuration
    output_data = namelist_data.get("output", {})
    default_interval = 100 * max_step
    self.output: OutputConfig = OutputConfig(
        interval_save=float(output_data.get("interval_save", default_interval)),
        interval_print=float(output_data.get("interval_print", default_interval)),
    )

    # FFTW configuration
    fftw_data = namelist_data.get("fftw", {})
    self.fftw: FFTWConfig = FFTWConfig(
        planning=str(fftw_data.get("planning", "FFTW_MEASURE")),
        threads=int(fftw_data.get("threads", 4)),
    )

    self._log_configuration()
    self.logger.info("--- namelist loaded successfully")

log_level property

log_level: str

Convenience accessor for log level.

fftw_planning property

fftw_planning: str

Convenience accessor for FFTW planning strategy.

fftw_threads property

fftw_threads: int

Convenience accessor for FFTW thread count.

cfl_target property

cfl_target: float

Target CFL number for adaptive time stepping.

max_step property

max_step: float

Maximum allowed time step.

domain_length property

domain_length: float

Convenience accessor for domain length.

viscosity property

viscosity: float

Convenience accessor for viscosity.

hyperviscosity_enabled property

hyperviscosity_enabled: bool

Convenience accessor for hyperviscosity enabled flag.

t_save property

t_save: float

Save interval in seconds.

t_print property

t_print: float

Print interval in seconds.

get_dns_config

get_dns_config() -> dict[str, Any]

Get DNS-specific configuration as a dictionary.

Returns:

Type Description
dict[str, Any]

Dictionary with DNS configuration values.

Source code in pyburgers/utils/io/input.py
def get_dns_config(self) -> dict[str, Any]:
    """Get DNS-specific configuration as a dictionary.

    Returns:
        Dictionary with DNS configuration values.
    """
    return {
        "nx": self.grid.dns.points,
        "cfl": self.time.cfl,
        "max_step": self.time.max_step,
        "viscosity": self.physics.viscosity,
        "noise_beta": self.physics.noise.exponent,
        "noise_amplitude": self.physics.noise.amplitude,
        "t_save": self.output.interval_save,
        "domain_length": self.grid.length,
    }

get_les_config

get_les_config() -> dict[str, Any]

Get LES-specific configuration as a dictionary.

Returns:

Type Description
dict[str, Any]

Dictionary with LES configuration values.

Source code in pyburgers/utils/io/input.py
def get_les_config(self) -> dict[str, Any]:
    """Get LES-specific configuration as a dictionary.

    Returns:
        Dictionary with LES configuration values.
    """
    return {
        "nx": self.grid.les.points,
        "sgs_model": self.physics.subgrid_model,
        "cfl": self.time.cfl,
        "max_step": self.time.max_step,
        "viscosity": self.physics.viscosity,
        "noise_beta": self.physics.noise.exponent,
        "noise_amplitude": self.physics.noise.amplitude,
        "t_save": self.output.interval_save,
        "domain_length": self.grid.length,
    }

Output

Output(outfile: str, sync_interval: int = 100)

Manages the creation and writing of NetCDF output files.

This class handles all aspects of the output file, from its initial creation to writing data at each time step and final closing.

Attributes:

Name Type Description
outfile Dataset

A netCDF4.Dataset object representing the output file.

fields_time dict[str, Any]

A dictionary mapping time-varying field names to their NetCDF variable objects.

fields_static dict[str, Any]

A dictionary mapping static field names to their NetCDF variable objects.

attributes dict[str, dict[str, Any]]

A dictionary defining the metadata (dimensions, units, etc.) for each possible output variable.

Initialize the Output class and create the NetCDF file.

Parameters:

Name Type Description Default
outfile str

The path and name for the output NetCDF file.

required
sync_interval int

Number of saves between disk syncs. Higher values improve performance but risk data loss on crash. Defaults to 100.

100
Source code in pyburgers/utils/io/output.py
def __init__(self, outfile: str, sync_interval: int = 100) -> None:
    """Initialize the Output class and create the NetCDF file.

    Args:
        outfile: The path and name for the output NetCDF file.
        sync_interval: Number of saves between disk syncs. Higher values
            improve performance but risk data loss on crash. Defaults to 100.
    """
    self.logger: logging.Logger = get_logger("Output")
    self.logger.info("Saving output to %s", outfile)
    self.outfile: nc.Dataset = nc.Dataset(outfile, "w")
    self._sync_interval = sync_interval
    self._save_count = 0

    self.outfile.description = "PyBurgers output"
    self.outfile.source = "PyBurgers - 1D Stochastic Burgers Equation Solver"
    self.outfile.history = "Created " + time_module.ctime(time_module.time())

    self.fields_time: dict[str, Any] = {}
    self.fields_static: dict[str, Any] = {}
    self.attributes: dict[str, dict[str, Any]] = {
        "time": {"dimension": ("t",), "long_name": "time", "units": "s"},
        "x": {"dimension": ("x",), "long_name": "x-distance", "units": "m"},
        "u": {"dimension": ("t", "x"), "long_name": "u-component velocity", "units": "m s-1"},
        "tke": {
            "dimension": ("t",),
            "long_name": "turbulence kinetic energy",
            "units": "m2 s-2",
        },
        "tke_sgs": {
            "dimension": ("t",),
            "long_name": "subgrid turbulence kinetic energy",
            "units": "m2 s-2",
        },
        "tke_sgs_prod": {
            "dimension": ("t",),
            "long_name": "subgrid TKE production",
            "units": "m2 s-3",
        },
        "tke_sgs_diff": {
            "dimension": ("t",),
            "long_name": "subgrid TKE diffusion",
            "units": "m2 s-3",
        },
        "tke_sgs_diss": {
            "dimension": ("t",),
            "long_name": "subgrid TKE dissipation",
            "units": "m2 s-3",
        },
        "C_sgs": {"dimension": ("t",), "long_name": "subgrid model coefficient", "units": "--"},
        "diss_sgs": {
            "dimension": ("t",),
            "long_name": "subgrid dissipation",
            "units": "m2 s-3",
        },
        "diss_mol": {
            "dimension": ("t",),
            "long_name": "molecular dissipation",
            "units": "m2 s-3",
        },
        "ens_prod": {"dimension": ("t",), "long_name": "enstrophy production", "units": "s-3"},
        "ens_diss_sgs": {
            "dimension": ("t",),
            "long_name": "subgrid enstrophy dissipation",
            "units": "s-3",
        },
        "ens_diss_mol": {
            "dimension": ("t",),
            "long_name": "molecular enstrophy dissipation",
            "units": "s-3",
        },
    }

set_dims

set_dims(dims: dict[str, int]) -> None

Set the dimensions in the NetCDF output file.

Parameters:

Name Type Description Default
dims dict[str, int]

A dictionary mapping dimension names to their sizes. A size of 0 indicates an unlimited dimension.

required
Source code in pyburgers/utils/io/output.py
def set_dims(self, dims: dict[str, int]) -> None:
    """Set the dimensions in the NetCDF output file.

    Args:
        dims: A dictionary mapping dimension names to their sizes. A size
            of 0 indicates an unlimited dimension.
    """
    for dim, size in dims.items():
        if size == 0:
            self.outfile.createDimension(dim)
        else:
            self.outfile.createDimension(dim, size)

set_fields

set_fields(fields: dict[str, Any]) -> None

Create the variables (fields) in the NetCDF output file.

Parameters:

Name Type Description Default
fields dict[str, Any]

A dictionary of fields to be created in the output file.

required
Source code in pyburgers/utils/io/output.py
def set_fields(self, fields: dict[str, Any]) -> None:
    """Create the variables (fields) in the NetCDF output file.

    Args:
        fields: A dictionary of fields to be created in the output file.
    """
    # Add time manually
    dims = self.attributes["time"]["dimension"]
    name = self.attributes["time"]["long_name"]
    units = self.attributes["time"]["units"]
    ncvar = self.outfile.createVariable("time", "f8", dims)
    ncvar.units = units
    ncvar.long_name = name
    self.fields_time["time"] = ncvar

    # Iterate through keys in dictionary
    for field in fields:
        if field not in self.attributes:
            self.logger.warning("Unknown field %s, skipping", field)
            continue
        dims = self.attributes[field]["dimension"]
        units = self.attributes[field]["units"]
        name = self.attributes[field]["long_name"]
        ncvar = self.outfile.createVariable(field, "f8", dims)
        ncvar.units = units
        ncvar.long_name = name
        if "t" in dims:
            self.fields_time[field] = ncvar
        else:
            self.fields_static[field] = ncvar

save

save(fields: dict[str, Any], tidx: int, time: float, initial: bool = False) -> None

Save a snapshot of the simulation state to the output file.

Parameters:

Name Type Description Default
fields dict[str, Any]

A dictionary of data fields to save.

required
tidx int

The time index for the current snapshot.

required
time float

The simulation time in seconds.

required
initial bool

A boolean flag indicating if this is the initial save, in which case only static fields are written. Defaults to False.

False
Source code in pyburgers/utils/io/output.py
def save(self, fields: dict[str, Any], tidx: int, time: float, initial: bool = False) -> None:
    """Save a snapshot of the simulation state to the output file.

    Args:
        fields: A dictionary of data fields to save.
        tidx: The time index for the current snapshot.
        time: The simulation time in seconds.
        initial: A boolean flag indicating if this is the initial save,
            in which case only static fields are written. Defaults to False.
    """
    # Save static fields only on initial save
    if initial:
        for field, static_var in self.fields_static.items():
            if field in fields:
                static_var[:] = np.asarray(fields[field])

    # Save time-varying fields
    for field, field_var in self.fields_time.items():
        dim = self.attributes[field]["dimension"]
        if len(dim) == 1:
            if field == "time":
                field_var[tidx] = time
            elif field in fields:
                field_var[tidx] = fields[field]
        else:
            if field in fields:
                field_var[tidx, :] = np.real(np.asarray(fields[field]))

    self._save_count += 1
    if self._sync_interval > 0 and self._save_count % self._sync_interval == 0:
        self.outfile.sync()

close

close() -> None

Close the NetCDF output file.

Performs a final sync to ensure all buffered data is written before closing the file.

Source code in pyburgers/utils/io/output.py
def close(self) -> None:
    """Close the NetCDF output file.

    Performs a final sync to ensure all buffered data is written before
    closing the file.
    """
    self.outfile.sync()
    self.outfile.close()
    self.logger.info("Output file closed")

Physics

SGS

SGS(input_obj: Input, spectral: SpectralWorkspace)

Base class for subgrid-scale models.

Provides the interface and factory method for SGS models used in Large-Eddy Simulation. The base class returns zero SGS stress, equivalent to no subgrid model.

Attributes:

Name Type Description
input

Input configuration object.

spectral

SpectralWorkspace with shared spectral utilities.

nx

Number of LES grid points.

dx

Grid spacing.

result dict[str, Any]

Dictionary containing SGS stress (tau) and coefficient.

Initialize the SGS model.

Parameters:

Name Type Description Default
input_obj Input

Input configuration object containing simulation parameters.

required
spectral SpectralWorkspace

SpectralWorkspace containing shared spectral utilities. The base SGS class does not use this, but accepts it for consistency with subclasses.

required
Source code in pyburgers/physics/sgs/sgs.py
def __init__(self, input_obj: Input, spectral: SpectralWorkspace) -> None:
    """Initialize the SGS model.

    Args:
        input_obj: Input configuration object containing simulation
            parameters.
        spectral: SpectralWorkspace containing shared spectral utilities.
            The base SGS class does not use this, but accepts it for
            consistency with subclasses.
    """
    self.input = input_obj
    self.spectral = spectral
    self.nx = input_obj.grid.les.points
    self.dx = input_obj.domain_length / self.nx
    self.fftw_planning = input_obj.fftw_planning
    self.fftw_threads = input_obj.fftw_threads

    # SGS terms dictionary
    self.result: dict[str, Any] = {"tau": np.zeros(self.nx), "coeff": 0}

get_model staticmethod

get_model(model: int, input_obj: Input, spectral: SpectralWorkspace) -> SGS

Factory method to create the appropriate SGS model.

Parameters:

Name Type Description Default
model int

SGS model type identifier. 0 = No model (base SGS) 1 = Constant-coefficient Smagorinsky 2 = Dynamic Smagorinsky 3 = Dynamic Wong-Lilly 4 = Deardorff 1.5-order TKE

required
input_obj Input

Input configuration object.

required
spectral SpectralWorkspace

SpectralWorkspace containing shared spectral utilities (Derivatives, Dealias, Filter). REQUIRED for all SGS models.

required

Returns:

Type Description
SGS

Instance of the requested SGS model subclass.

Source code in pyburgers/physics/sgs/sgs.py
@staticmethod
def get_model(model: int, input_obj: Input, spectral: SpectralWorkspace) -> SGS:
    """Factory method to create the appropriate SGS model.

    Args:
        model: SGS model type identifier.
            0 = No model (base SGS)
            1 = Constant-coefficient Smagorinsky
            2 = Dynamic Smagorinsky
            3 = Dynamic Wong-Lilly
            4 = Deardorff 1.5-order TKE
        input_obj: Input configuration object.
        spectral: SpectralWorkspace containing shared spectral utilities
            (Derivatives, Dealias, Filter). REQUIRED for all SGS models.

    Returns:
        Instance of the requested SGS model subclass.
    """
    if model == 0:
        return SGS(input_obj, spectral)
    if model == 1:
        from .sgs_smagcon import SmagConstant

        return SmagConstant(input_obj, spectral)
    if model == 2:
        from .sgs_smagdyn import SmagDynamic

        return SmagDynamic(input_obj, spectral)
    if model == 3:
        from .sgs_wonglilly import WongLilly

        return WongLilly(input_obj, spectral)
    if model == 4:
        from .sgs_deardorff import Deardorff

        return Deardorff(input_obj, spectral)
    raise ValueError(f"--- Unknown SGS model ID: {model}. Valid options: 0-4.")

compute

compute(u: ndarray, dudx: ndarray, tke_sgs: ndarray | float, dt: float) -> dict[str, Any]

Compute the SGS stress tensor.

Parameters:

Name Type Description Default
u ndarray

Velocity field array.

required
dudx ndarray

Velocity gradient (du/dx) array.

required
tke_sgs ndarray | float

Subgrid TKE (used by Deardorff model).

required
dt float

Current time step size.

required

Returns:

Type Description
dict[str, Any]

Dictionary containing: - 'tau': SGS stress array - 'coeff': Model coefficient - 'tke_sgs': Updated subgrid TKE (Deardorff only)

Source code in pyburgers/physics/sgs/sgs.py
def compute(
    self, u: np.ndarray, dudx: np.ndarray, tke_sgs: np.ndarray | float, dt: float
) -> dict[str, Any]:
    """Compute the SGS stress tensor.

    Args:
        u: Velocity field array.
        dudx: Velocity gradient (du/dx) array.
        tke_sgs: Subgrid TKE (used by Deardorff model).
        dt: Current time step size.

    Returns:
        Dictionary containing:
            - 'tau': SGS stress array
            - 'coeff': Model coefficient
            - 'tke_sgs': Updated subgrid TKE (Deardorff only)
    """
    return self.result

SmagConstant

SmagConstant(input_obj: Input, spectral: SpectralWorkspace)

Bases: SGS

Constant-coefficient Smagorinsky subgrid-scale model.

The classic Smagorinsky model computes SGS stress as

tau = -2 * Cs^2 * dx^2 * |S| * S

where Cs is a constant (typically 0.16) and S is the strain rate.

Uses the shared spectral workspace for dealiasing operations.

Initialize the constant Smagorinsky model.

Parameters:

Name Type Description Default
input_obj Input

Input configuration object.

required
spectral SpectralWorkspace

SpectralWorkspace with shared Dealias utility.

required
Source code in pyburgers/physics/sgs/sgs_smagcon.py
def __init__(self, input_obj: Input, spectral: SpectralWorkspace) -> None:
    """Initialize the constant Smagorinsky model.

    Args:
        input_obj: Input configuration object.
        spectral: SpectralWorkspace with shared Dealias utility.
    """
    super().__init__(input_obj, spectral)
    self.logger: logging.Logger = get_logger("SGS")
    self.logger.info("--- Using the Smagorinsky model")

compute

compute(u: ndarray, dudx: ndarray, tke_sgs: ndarray | float, dt: float) -> dict[str, Any]

Compute the Smagorinsky SGS stress.

Parameters:

Name Type Description Default
u ndarray

Velocity field array (unused).

required
dudx ndarray

Velocity gradient array.

required
tke_sgs ndarray | float

Subgrid TKE (unused in this model).

required

Returns:

Type Description
dict[str, Any]

Dictionary with 'tau' (SGS stress) and 'coeff' (Cs).

Source code in pyburgers/physics/sgs/sgs_smagcon.py
def compute(
    self, u: np.ndarray, dudx: np.ndarray, tke_sgs: np.ndarray | float, dt: float
) -> dict[str, Any]:
    """Compute the Smagorinsky SGS stress.

    Args:
        u: Velocity field array (unused).
        dudx: Velocity gradient array.
        tke_sgs: Subgrid TKE (unused in this model).

    Returns:
        Dictionary with 'tau' (SGS stress) and 'coeff' (Cs).
    """
    # Model constants
    cs = c.sgs.SMAG_CONSTANT_CS
    cs2 = cs**2

    dudx2 = self.spectral.dealias.compute(dudx)

    self.result["tau"] = -2 * cs2 * (self.dx**2) * dudx2
    self.result["coeff"] = np.sqrt(cs2)

    return self.result

SmagDynamic

SmagDynamic(input_obj: Input, spectral: SpectralWorkspace)

Bases: SGS

Dynamic Smagorinsky subgrid-scale model.

Uses the Germano identity to dynamically compute the Smagorinsky coefficient from the resolved velocity field. This removes the need for tuning Cs and allows it to adapt to local flow conditions.

Uses the shared spectral workspace for filtering and dealiasing operations.

Initialize the dynamic Smagorinsky model.

Parameters:

Name Type Description Default
input_obj Input

Input configuration object.

required
spectral SpectralWorkspace

SpectralWorkspace with shared Dealias and Filter utilities.

required
Source code in pyburgers/physics/sgs/sgs_smagdyn.py
def __init__(self, input_obj: Input, spectral: SpectralWorkspace) -> None:
    """Initialize the dynamic Smagorinsky model.

    Args:
        input_obj: Input configuration object.
        spectral: SpectralWorkspace with shared Dealias and Filter utilities.
    """
    super().__init__(input_obj, spectral)
    self.logger: logging.Logger = get_logger("SGS")
    self.logger.info("--- Using the Dynamic Smagorinsky model")

compute

compute(u: ndarray, dudx: ndarray, tke_sgs: ndarray | float, dt: float) -> dict[str, Any]

Compute the dynamic Smagorinsky SGS stress.

Uses test filtering to compute the Leonard stress L and model tensor M, then determines Cs^2 from their contraction.

Parameters:

Name Type Description Default
u ndarray

Velocity field array.

required
dudx ndarray

Velocity gradient array.

required
tke_sgs ndarray | float

Subgrid TKE (unused in this model).

required

Returns:

Type Description
dict[str, Any]

Dictionary with 'tau' (SGS stress) and 'coeff' (Cs).

Source code in pyburgers/physics/sgs/sgs_smagdyn.py
def compute(
    self, u: np.ndarray, dudx: np.ndarray, tke_sgs: np.ndarray | float, dt: float
) -> dict[str, Any]:
    """Compute the dynamic Smagorinsky SGS stress.

    Uses test filtering to compute the Leonard stress L and
    model tensor M, then determines Cs^2 from their contraction.

    Args:
        u: Velocity field array.
        dudx: Velocity gradient array.
        tke_sgs: Subgrid TKE (unused in this model).

    Returns:
        Dictionary with 'tau' (SGS stress) and 'coeff' (Cs).
    """
    # Model constants
    ratio = c.sgs.TEST_FILTER_RATIO

    # Leonard stress L11 = <uu> - <u><u>
    uf = self.spectral.filter.cutoff(u, ratio)
    uuf = self.spectral.filter.cutoff(u**2, ratio)
    L11 = uuf - uf * uf

    # Model tensor M11
    dudxf = self.spectral.filter.cutoff(dudx, ratio)
    T = np.abs(dudx) * dudx
    Tf = self.spectral.filter.cutoff(T, ratio)
    M11 = (self.dx**2) * ((ratio**2) * np.abs(dudxf) * dudxf - Tf)

    # Dealiased strain rate
    dudx2 = self.spectral.dealias.compute(dudx)

    # Dynamic Smagorinsky coefficient
    if np.mean(M11 * M11) == 0:
        cs2 = 0
    else:
        cs2 = 0.5 * np.mean(L11 * M11) / np.mean(M11 * M11)
        if cs2 < 0:
            cs2 = 0

    self.result["tau"] = -2 * cs2 * (self.dx**2) * dudx2
    self.result["coeff"] = np.sqrt(cs2)

    return self.result

WongLilly

WongLilly(input_obj: Input, spectral: SpectralWorkspace)

Bases: SGS

Dynamic Wong-Lilly subgrid-scale model.

A scale-similarity based SGS model that uses a different scaling for the SGS stress compared to the Smagorinsky model. The stress scales as dx^(4/3) rather than dx^2.

Uses the shared spectral workspace for filtering operations.

Initialize the Wong-Lilly model.

Parameters:

Name Type Description Default
input_obj Input

Input configuration object.

required
spectral SpectralWorkspace

SpectralWorkspace with shared Filter utility.

required
Source code in pyburgers/physics/sgs/sgs_wonglilly.py
def __init__(self, input_obj: Input, spectral: SpectralWorkspace) -> None:
    """Initialize the Wong-Lilly model.

    Args:
        input_obj: Input configuration object.
        spectral: SpectralWorkspace with shared Filter utility.
    """
    super().__init__(input_obj, spectral)
    self.logger: logging.Logger = get_logger("SGS")
    self.logger.info("--- Using the Wong-Lilly model")

compute

compute(u: ndarray, dudx: ndarray, tke_sgs: ndarray | float, dt: float) -> dict[str, Any]

Compute the Wong-Lilly SGS stress.

Parameters:

Name Type Description Default
u ndarray

Velocity field array.

required
dudx ndarray

Velocity gradient array.

required
tke_sgs ndarray | float

Subgrid TKE (unused in this model).

required

Returns:

Type Description
dict[str, Any]

Dictionary with 'tau' (SGS stress) and 'coeff' (C_WL).

Source code in pyburgers/physics/sgs/sgs_wonglilly.py
def compute(
    self, u: np.ndarray, dudx: np.ndarray, tke_sgs: np.ndarray | float, dt: float
) -> dict[str, Any]:
    """Compute the Wong-Lilly SGS stress.

    Args:
        u: Velocity field array.
        dudx: Velocity gradient array.
        tke_sgs: Subgrid TKE (unused in this model).

    Returns:
        Dictionary with 'tau' (SGS stress) and 'coeff' (C_WL).
    """

    # Model constants
    ratio = c.sgs.TEST_FILTER_RATIO
    exponent = c.sgs.WONGLILLY_EXPONENT

    # Leonard stress L11
    uf = self.spectral.filter.cutoff(u, ratio)
    uuf = self.spectral.filter.cutoff(u**2, ratio)
    L11 = uuf - uf * uf

    # Model tensor M11 (Wong-Lilly scaling)
    dudxf = self.spectral.filter.cutoff(dudx, ratio)
    ratio_pow = ratio**exponent
    M11 = self.dx**exponent * (1 - ratio_pow) * dudxf

    # Wong-Lilly coefficient
    if np.mean(M11 * M11) == 0:
        cwl = 0
    else:
        cwl = 0.5 * np.mean(L11 * M11) / np.mean(M11 * M11)
        if cwl < 0:
            cwl = 0

    self.result["tau"] = -2 * cwl * (self.dx**exponent) * dudx
    self.result["coeff"] = cwl

    return self.result

Deardorff

Deardorff(input_obj: Input, spectral: SpectralWorkspace)

Bases: SGS

Deardorff 1.5-order TKE subgrid-scale model.

A prognostic SGS model that solves a transport equation for subgrid turbulent kinetic energy (TKE). The eddy viscosity is computed from the subgrid TKE as: nu_t = c1 * dx * sqrt(tke_sgs).

Uses the shared spectral workspace for dealiasing and derivative operations.

Initialize the Deardorff TKE model.

Parameters:

Name Type Description Default
input_obj Input

Input configuration object.

required
spectral SpectralWorkspace

SpectralWorkspace with shared Dealias and Derivatives utilities.

required
Source code in pyburgers/physics/sgs/sgs_deardorff.py
def __init__(self, input_obj: Input, spectral: SpectralWorkspace) -> None:
    """Initialize the Deardorff TKE model.

    Args:
        input_obj: Input configuration object.
        spectral: SpectralWorkspace with shared Dealias and Derivatives utilities.
    """
    super().__init__(input_obj, spectral)
    self.logger: logging.Logger = get_logger("SGS")
    self.logger.info("--- Using the Deardorff TKE model")

compute

compute(u: ndarray, dudx: ndarray, tke_sgs: ndarray | float, dt: float) -> dict[str, Any]

Compute the Deardorff SGS stress and update subgrid TKE.

Solves the prognostic TKE equation and computes the SGS stress from the updated subgrid TKE.

Parameters:

Name Type Description Default
u ndarray

Velocity field array.

required
dudx ndarray

Velocity gradient array.

required
tke_sgs ndarray | float

Current subgrid TKE array.

required
dt float

Current time step size for TKE tendency integration.

required

Returns:

Type Description
dict[str, Any]

Dictionary with 'tau' (SGS stress), 'coeff' (c1),

dict[str, Any]

and 'tke_sgs' (updated subgrid TKE).

Source code in pyburgers/physics/sgs/sgs_deardorff.py
def compute(
    self, u: np.ndarray, dudx: np.ndarray, tke_sgs: np.ndarray | float, dt: float
) -> dict[str, Any]:
    """Compute the Deardorff SGS stress and update subgrid TKE.

    Solves the prognostic TKE equation and computes the SGS stress
    from the updated subgrid TKE.

    Args:
        u: Velocity field array.
        dudx: Velocity gradient array.
        tke_sgs: Current subgrid TKE array.
        dt: Current time step size for TKE tendency integration.

    Returns:
        Dictionary with 'tau' (SGS stress), 'coeff' (c1),
        and 'tke_sgs' (updated subgrid TKE).
    """

    # Model constants
    ce = c.sgs.DEARDORFF_CE  # Dissipation coefficient
    c1 = c.sgs.DEARDORFF_C1  # Eddy viscosity coefficient

    # Derivatives.compute uses the shared velocity buffer; preserve u.
    u_local = u.copy()

    # Strain rate squared (1D), used for production
    dudx2 = dudx * dudx

    # Compute TKE gradients
    derivs_k = self.spectral.derivatives.compute(tke_sgs, [1])
    dkdx = derivs_k["1"]

    derivs_ku = self.spectral.derivatives.compute(tke_sgs * u_local, [1])
    dkudx = derivs_ku["1"]

    # Eddy viscosity and SGS stress
    tke_sgs_safe = np.maximum(tke_sgs, 0.0)
    Vt = c1 * self.dx * np.sqrt(tke_sgs_safe)
    tau = -2.0 * Vt * dudx

    # TKE diffusion term
    zz = 2 * Vt * dkdx
    derivs_zz = self.spectral.derivatives.compute(zz, [1])
    dzzdx = derivs_zz["1"]

    # TKE tendency: advection + production + diffusion - dissipation
    prod = 2 * Vt * dudx2
    diff = dzzdx
    diss = -ce * (tke_sgs**1.5) / self.dx
    dtke = ((-1 * dkudx) + prod + diff + diss) * dt

    # Update subgrid TKE
    tke_sgs_new = np.maximum(tke_sgs + dtke, 0.0)
    self.spectral.u[:] = u_local

    self.result["tau"] = tau
    self.result["coeff"] = c1
    self.result["tke_sgs"] = tke_sgs_new
    self.result["tke_prod"] = float(np.mean(prod))
    self.result["tke_diff"] = float(np.mean(diff))
    self.result["tke_diss"] = float(np.mean(diss))

    return self.result