Skip to content

API Reference

Overview

PyBurgers: 1D Stochastic Burgers Equation Solver.

A tool for studying Burgers turbulence using DNS and LES.

Core Solvers

Burgers

Burgers(input_obj: Input, output_obj: Output)

Bases: ABC

Abstract base class for Burgers equation solvers.

Provides common functionality for solving the 1D stochastic Burgers equation using Fourier collocation for spatial derivatives and pluggable time integration with CFL-based adaptive time stepping.

Subclasses must implement
  • _get_nx(): Return the grid resolution for this mode
  • _create_spectral_workspace(): Create the spectral workspace for this mode
  • _setup_mode_specific(): Initialize mode-specific components
  • _setup_output_fields(): Configure output fields dictionary
  • _compute_noise(): Generate/process noise for this time step
  • _compute_rhs(): Compute the right-hand side of the equation
  • _save_diagnostics(): Compute and save mode-specific diagnostics

Attributes:

Name Type Description
input

Input configuration object.

output

Output handler for NetCDF writing.

nx

Number of grid points.

dx

Grid spacing.

visc

Kinematic viscosity.

noise_amp

Noise amplitude.

cfl_target

Target CFL number.

max_step

Maximum allowed time step.

t_duration

Total simulation time.

t_save

Output save interval in physical time.

t_print

Progress print interval in physical time.

Initialize the Burgers solver.

Parameters:

Name Type Description Default
input_obj Input

Input configuration containing simulation parameters.

required
output_obj Output

Output handler for writing results to NetCDF.

required
Source code in pyburgers/core.py
def __init__(self, input_obj: Input, output_obj: Output) -> None:
    """Initialize the Burgers solver.

    Args:
        input_obj: Input configuration containing simulation parameters.
        output_obj: Output handler for writing results to NetCDF.
    """
    self.logger: logging.Logger = get_logger(self.mode_name)
    self.logger.info("You are running in %s mode", self.mode_name)

    # Initialize random number generator (seed=None means random each run)
    self.rng = np.random.default_rng(input_obj.physics.noise.seed)

    # Store input/output objects
    self.logger.debug("Reading input settings")
    self.input = input_obj
    self.output = output_obj

    # Extract common configuration
    self.visc = input_obj.viscosity
    self.noise_amp = input_obj.physics.noise.amplitude
    self.noise_beta = input_obj.physics.noise.exponent
    self.fftw_planning = input_obj.fftw_planning
    self.fftw_threads = input_obj.fftw_threads
    self.domain_length = input_obj.domain_length

    # Adaptive time stepping parameters
    self.cfl_target = input_obj.cfl_target
    self.max_step = input_obj.max_step
    self.t_duration = input_obj.time.duration
    self.t_save = input_obj.t_save
    self.t_print = input_obj.t_print

    # Get mode-specific grid resolution
    self.nx = self._get_nx()
    self.dx = self.domain_length / self.nx

    # Create time integrator early so its stability coefficients inform dt limits
    self.integrator = get_integrator(input_obj.numerics.temporal, self.nx)

    # Create spectral workspace and gradient operator before computing dt limits
    # so that the operator's eigenvalue attributes can inform those limits.
    self.spectral = self._create_spectral_workspace()
    self.gradient_op = get_operator(
        input_obj.numerics.spatial, self.nx, self.dx, self.spectral
    )

    # Precompute viscous stability limit (constant for the run).
    # The gradient operator's viscous_eigenvalue (max |k̃²|·dx²) scales the
    # limit correctly across spectral and finite-difference schemes.
    # Use the integrator's dissipative_stability_limit (same coefficient used
    # for hyperviscosity) so the limit is consistent with the chosen scheme.
    _C = self.integrator.dissipative_stability_limit
    self._dt_visc = float('inf') if self.visc == 0 else (
        _C * self.dx**2
        / (self.visc * self.gradient_op.viscous_eigenvalue)
    )

    # Hyperviscosity: coefficient normalized so the Nyquist damping rate
    # (ν₄ · k̃⁴_max) is equal across all spatial schemes.
    #
    # For the spectral scheme, k̃⁴_max = (π/dx)⁴, so ν₄ = dx⁴ gives a
    # Nyquist rate of π⁴.  FD stencils have smaller k̃⁴_max (16 for FD2,
    # 80/3 for FD4), so their coefficients are scaled up by π⁴/λ to
    # compensate:
    #
    #   ν₄ = dx⁴ · π⁴ / λ_hypervisc
    #
    # A beneficial side-effect: the dt stability limit reduces to C/π⁴ for
    # every scheme, so the hyperviscous timestep constraint is
    # scheme-independent.
    self.hypervisc = (
        self.dx**4 * np.pi**4 / self.gradient_op.hyperviscous_eigenvalue
    )
    self._dt_hypervisc = (
        _C * self.dx**4
        / (self.hypervisc * self.gradient_op.hyperviscous_eigenvalue)
    )
    self.logger.info(
        "Hyperviscosity: coefficient = %.2e",
        self.hypervisc,
    )

    # Grid coordinates
    self.x = np.linspace(0, self.domain_length, self.nx, endpoint=False)

    # Reference workspace buffers (zero-copy)
    self.u = self.spectral.u
    self.fu = self.spectral.fu

    # Initialize velocity field to zero
    self.u[:] = 0

    # Common output field
    self.tke = np.zeros(1)

    # Pre-allocated RHS buffer and precomputed noise scaling constant
    self.rhs = np.zeros(self.nx)
    self._noise_scale = np.sqrt(2.0 * self.noise_amp / self.max_step)

    # Step-scoped state for the RHS callable (avoids closure allocation in the loop)
    self._step_dt: float = 0.0
    self._step_noise: np.ndarray | None = None

    # Mode-specific setup (noise, SGS, etc.)
    self._setup_mode_specific()

    # Setup output
    self.output_dims = {"t": 0, "x": self.nx}
    self.output.set_dims(self.output_dims)

    self.output_fields = self._setup_output_fields()
    self.output.set_fields(self.output_fields)

    # Write initial data
    self.output.save(self.output_fields, 0, 0, initial=True)

run

run() -> None

Execute the time integration loop.

Advances the simulation using the configured time integrator with CFL-based adaptive time stepping. Output is written at exact multiples of t_save by clamping dt to hit output times.

Source code in pyburgers/core.py
def run(self) -> None:
    """Execute the time integration loop.

    Advances the simulation using the configured time integrator
    with CFL-based adaptive time stepping. Output is written at
    exact multiples of t_save by clamping dt to hit output times.
    """
    t_current = 0.0
    t_next_save = self.t_save
    t_next_print = self.t_print
    save_idx = 0

    # Sample noise at fixed max_step intervals so that DNS and LES
    # consume the same random sequence regardless of adaptive dt.
    noise = self._compute_noise()
    t_next_noise = self.max_step

    while t_current < self.t_duration - 1e-14:
        dt = self._compute_dt()

        # Clamp to hit next output time or end time exactly
        if t_current + dt >= t_next_save - 1e-14:
            dt = t_next_save - t_current
        if t_current + dt > self.t_duration:
            dt = self.t_duration - t_current
        if dt < 1e-15:
            break

        is_output_step = abs(t_current + dt - t_next_save) <= 1e-12 * max(1.0, t_next_save)

        # Set step-scoped state and delegate to the integrator
        self._step_dt = dt
        self._step_noise = noise
        self.integrator.step(
            self.u, dt, self._rhs_for_step, self.gradient_op.zero_nyquist
        )

        # Post-step hook (e.g., advance prognostic SGS quantities once per step)
        self._post_step(dt)

        t_current += dt

        # Refresh noise at fixed intervals
        if t_current >= t_next_noise - 1e-14:
            noise = self._compute_noise()
            t_next_noise += self.max_step

        # Progress logging
        self._log_progress(t_current, t_next_print)
        if t_current >= t_next_print - 1e-14:
            t_next_print += self.t_print

        # Output at exact save times
        if is_output_step:
            save_idx += 1
            derivatives = self._compute_diagnostic_derivatives()
            t_exact = save_idx * self.t_save
            self._save_diagnostics(derivatives, save_idx, t_exact)
            t_next_save += self.t_save

    if self.logger.isEnabledFor(logging.INFO):
        print()

DNS

DNS(input_obj: Input, output_obj: Output)

Bases: Burgers

Direct numerical simulation solver for the Burgers equation.

Solves the 1D stochastic Burgers equation at full resolution using Fourier collocation for spatial derivatives and pluggable time integration.

This class inherits common functionality from Burgers and implements DNS-specific behavior for noise generation and diagnostics. Uses a SpectralWorkspace with Derivatives and Dealias utilities (no Filter needed for DNS).

Source code in pyburgers/core.py
def __init__(self, input_obj: Input, output_obj: Output) -> None:
    """Initialize the Burgers solver.

    Args:
        input_obj: Input configuration containing simulation parameters.
        output_obj: Output handler for writing results to NetCDF.
    """
    self.logger: logging.Logger = get_logger(self.mode_name)
    self.logger.info("You are running in %s mode", self.mode_name)

    # Initialize random number generator (seed=None means random each run)
    self.rng = np.random.default_rng(input_obj.physics.noise.seed)

    # Store input/output objects
    self.logger.debug("Reading input settings")
    self.input = input_obj
    self.output = output_obj

    # Extract common configuration
    self.visc = input_obj.viscosity
    self.noise_amp = input_obj.physics.noise.amplitude
    self.noise_beta = input_obj.physics.noise.exponent
    self.fftw_planning = input_obj.fftw_planning
    self.fftw_threads = input_obj.fftw_threads
    self.domain_length = input_obj.domain_length

    # Adaptive time stepping parameters
    self.cfl_target = input_obj.cfl_target
    self.max_step = input_obj.max_step
    self.t_duration = input_obj.time.duration
    self.t_save = input_obj.t_save
    self.t_print = input_obj.t_print

    # Get mode-specific grid resolution
    self.nx = self._get_nx()
    self.dx = self.domain_length / self.nx

    # Create time integrator early so its stability coefficients inform dt limits
    self.integrator = get_integrator(input_obj.numerics.temporal, self.nx)

    # Create spectral workspace and gradient operator before computing dt limits
    # so that the operator's eigenvalue attributes can inform those limits.
    self.spectral = self._create_spectral_workspace()
    self.gradient_op = get_operator(
        input_obj.numerics.spatial, self.nx, self.dx, self.spectral
    )

    # Precompute viscous stability limit (constant for the run).
    # The gradient operator's viscous_eigenvalue (max |k̃²|·dx²) scales the
    # limit correctly across spectral and finite-difference schemes.
    # Use the integrator's dissipative_stability_limit (same coefficient used
    # for hyperviscosity) so the limit is consistent with the chosen scheme.
    _C = self.integrator.dissipative_stability_limit
    self._dt_visc = float('inf') if self.visc == 0 else (
        _C * self.dx**2
        / (self.visc * self.gradient_op.viscous_eigenvalue)
    )

    # Hyperviscosity: coefficient normalized so the Nyquist damping rate
    # (ν₄ · k̃⁴_max) is equal across all spatial schemes.
    #
    # For the spectral scheme, k̃⁴_max = (π/dx)⁴, so ν₄ = dx⁴ gives a
    # Nyquist rate of π⁴.  FD stencils have smaller k̃⁴_max (16 for FD2,
    # 80/3 for FD4), so their coefficients are scaled up by π⁴/λ to
    # compensate:
    #
    #   ν₄ = dx⁴ · π⁴ / λ_hypervisc
    #
    # A beneficial side-effect: the dt stability limit reduces to C/π⁴ for
    # every scheme, so the hyperviscous timestep constraint is
    # scheme-independent.
    self.hypervisc = (
        self.dx**4 * np.pi**4 / self.gradient_op.hyperviscous_eigenvalue
    )
    self._dt_hypervisc = (
        _C * self.dx**4
        / (self.hypervisc * self.gradient_op.hyperviscous_eigenvalue)
    )
    self.logger.info(
        "Hyperviscosity: coefficient = %.2e",
        self.hypervisc,
    )

    # Grid coordinates
    self.x = np.linspace(0, self.domain_length, self.nx, endpoint=False)

    # Reference workspace buffers (zero-copy)
    self.u = self.spectral.u
    self.fu = self.spectral.fu

    # Initialize velocity field to zero
    self.u[:] = 0

    # Common output field
    self.tke = np.zeros(1)

    # Pre-allocated RHS buffer and precomputed noise scaling constant
    self.rhs = np.zeros(self.nx)
    self._noise_scale = np.sqrt(2.0 * self.noise_amp / self.max_step)

    # Step-scoped state for the RHS callable (avoids closure allocation in the loop)
    self._step_dt: float = 0.0
    self._step_noise: np.ndarray | None = None

    # Mode-specific setup (noise, SGS, etc.)
    self._setup_mode_specific()

    # Setup output
    self.output_dims = {"t": 0, "x": self.nx}
    self.output.set_dims(self.output_dims)

    self.output_fields = self._setup_output_fields()
    self.output.set_fields(self.output_fields)

    # Write initial data
    self.output.save(self.output_fields, 0, 0, initial=True)

LES

LES(input_obj: Input, output_obj: Output)

Bases: Burgers

Large-eddy simulation solver for the Burgers equation.

Solves the filtered 1D stochastic Burgers equation using Fourier collocation for spatial derivatives, pluggable time integration, and subgrid-scale models for unresolved turbulence.

This class inherits common functionality from Burgers and implements LES-specific behavior including SGS modeling and filtered noise.

Attributes:

Name Type Description
nx_dns

Number of DNS grid points (for noise generation).

sgs_model_id

SGS model type identifier (0-4).

spectral

SpectralWorkspace with derivatives, dealias, and filter.

subgrid

SGS model instance.

tke_sgs

Subgrid TKE (for Deardorff model).

Initialize the LES solver.

Parameters:

Name Type Description Default
input_obj Input

Input configuration containing simulation parameters.

required
output_obj Output

Output handler for writing results to NetCDF.

required
Source code in pyburgers/les.py
def __init__(self, input_obj: Input, output_obj: Output) -> None:
    """Initialize the LES solver.

    Args:
        input_obj: Input configuration containing simulation parameters.
        output_obj: Output handler for writing results to NetCDF.
    """
    # Store LES-specific config before calling parent __init__
    # (needed because _create_spectral_workspace is called during parent init)
    self.nx_dns = input_obj.grid.dns.points
    self.sgs_model_id = input_obj.physics.subgrid_model

    super().__init__(input_obj, output_obj)

Numerical Methods

Temporal Integrators

TemporalIntegrator

TemporalIntegrator(nx: int)

Bases: ABC

Base class for time integration schemes.

Provides the interface and factory method for time integrators used by the Burgers equation solver. Subclasses implement the step method to advance the solution by one timestep.

Attributes:

Name Type Description
nx

Number of grid points.

dissipative_stability_limit float

Maximum stable |λ dt| for real negative eigenvalues (dissipative operators). Used by the core to set the hyperviscous time step limit. Subclasses override this based on their stability region for purely dissipative terms.

Initialize the time integrator.

Parameters:

Name Type Description Default
nx int

Number of grid points.

required
Source code in pyburgers/numerics/temporal/temporal.py
def __init__(self, nx: int) -> None:
    """Initialize the time integrator.

    Args:
        nx: Number of grid points.
    """
    self.nx = nx

get_integrator staticmethod

get_integrator(scheme: int, nx: int) -> TemporalIntegrator

Factory method to create the appropriate time integrator.

Parameters:

Name Type Description Default
scheme int

Time integration scheme identifier. 1 = Adams-Bashforth 2nd order 2 = Adams-Moulton 2nd order predictor-corrector 3 = Williamson low-storage RK3

required
nx int

Number of grid points for pre-allocating storage arrays.

required

Returns:

Type Description
TemporalIntegrator

Instance of the requested TemporalIntegrator subclass.

Raises:

Type Description
ValueError

If scheme is not a valid option (1-3).

Source code in pyburgers/numerics/temporal/temporal.py
@staticmethod
def get_integrator(scheme: int, nx: int) -> TemporalIntegrator:
    """Factory method to create the appropriate time integrator.

    Args:
        scheme: Time integration scheme identifier.
            1 = Adams-Bashforth 2nd order
            2 = Adams-Moulton 2nd order predictor-corrector
            3 = Williamson low-storage RK3
        nx: Number of grid points for pre-allocating storage arrays.

    Returns:
        Instance of the requested TemporalIntegrator subclass.

    Raises:
        ValueError: If scheme is not a valid option (1-3).
    """
    if scheme == 1:
        from .temporal_ab2 import AB2

        return AB2(nx)
    if scheme == 2:
        from .temporal_am2 import AM2

        return AM2(nx)
    if scheme == 3:
        from .temporal_rk3 import RK3

        return RK3(nx)
    raise ValueError(f"Unknown time integrator ID: {scheme}. Valid options: 1-3.")

step abstractmethod

step(u: ndarray, dt: float, compute_rhs: Callable[[], ndarray], zero_nyquist: Callable[[bool], None]) -> None

Advance the solution by one timestep.

Parameters:

Name Type Description Default
u ndarray

Velocity field array (modified in-place).

required
dt float

Time step size.

required
compute_rhs Callable[[], ndarray]

Callable that computes and returns the RHS vector.

required
zero_nyquist Callable[[bool], None]

Callable that zeros the Nyquist mode. Accepts a restore_physical boolean argument.

required
Source code in pyburgers/numerics/temporal/temporal.py
@abstractmethod
def step(
    self,
    u: np.ndarray,
    dt: float,
    compute_rhs: Callable[[], np.ndarray],
    zero_nyquist: Callable[[bool], None],
) -> None:
    """Advance the solution by one timestep.

    Args:
        u: Velocity field array (modified in-place).
        dt: Time step size.
        compute_rhs: Callable that computes and returns the RHS vector.
        zero_nyquist: Callable that zeros the Nyquist mode.
            Accepts a `restore_physical` boolean argument.
    """

AB2

AB2(nx: int)

Bases: TemporalIntegrator

Adams-Bashforth 2nd-order multistep method.

Uses the variable-step AB2 formula to maintain 2nd-order accuracy under CFL-based adaptive time stepping:

ω = dt_n / dt_{n-1}
u^{n+1} = u^n + dt_n · [(1 + ω/2) · F^n  −  (ω/2) · F^{n-1}]

Reduces to the standard (3/2, -1/2) coefficients when dt is constant. The first timestep is bootstrapped with forward Euler since no previous RHS is available.

Note: variable-step AB2 is weakly unstable when the step-size ratio ω = dt_n / dt_{n-1} varies rapidly between steps. In practice the CFL controller changes dt smoothly (ω ≈ 1), so the instability does not manifest, but users should be aware of this known limitation if adapting the scheme to more aggressive step-size controllers.

AB2 has a stability boundary at |λ dt| = 1 for real negative eigenvalues, but its imaginary stability boundary is very small (~0 for purely imaginary). For advection-diffusion problems, the combined eigenvalue λdt = α + iβ must lie inside a kidney-shaped stability region that is much smaller than the real-axis limit of 1. A limit of 0.2 keeps the viscous dt constraint binding (rather than the CFL) for u_max up to ~0.26, preventing the advective eigenvalue from reaching the unstable π × CFL ≈ 1.26 regime.

Attributes:

Name Type Description
dissipative_stability_limit float

0.2 — keeps the viscous dt limit binding so the combined advection-diffusion eigenvalue stays within AB2's stability region for typical DNS velocities.

_rhs_prev ndarray | None

RHS from the previous timestep, or None before the first step (triggers Euler bootstrap).

_dt_prev float | None

Time step used on the previous timestep, or None before the first step.

Initialize AB2 integrator.

Parameters:

Name Type Description Default
nx int

Number of grid points.

required
Source code in pyburgers/numerics/temporal/temporal_ab2.py
def __init__(self, nx: int) -> None:
    """Initialize AB2 integrator.

    Args:
        nx: Number of grid points.
    """
    super().__init__(nx)
    self.logger: logging.Logger = get_logger("Temporal")
    self.logger.info("--- using Adams-Bashforth 2nd-order time integration")
    self._rhs_prev: np.ndarray | None = None
    self._dt_prev: float | None = None

step

step(u: ndarray, dt: float, compute_rhs: Callable[[], ndarray], zero_nyquist: Callable[[bool], None]) -> None

Advance the solution by one timestep using AB2.

Parameters:

Name Type Description Default
u ndarray

Velocity field array (modified in-place).

required
dt float

Time step size.

required
compute_rhs Callable[[], ndarray]

Callable that computes and returns the RHS vector.

required
zero_nyquist Callable[[bool], None]

Callable that zeros the Nyquist mode.

required
Source code in pyburgers/numerics/temporal/temporal_ab2.py
def step(
    self,
    u: np.ndarray,
    dt: float,
    compute_rhs: Callable[[], np.ndarray],
    zero_nyquist: Callable[[bool], None],
) -> None:
    """Advance the solution by one timestep using AB2.

    Args:
        u: Velocity field array (modified in-place).
        dt: Time step size.
        compute_rhs: Callable that computes and returns the RHS vector.
        zero_nyquist: Callable that zeros the Nyquist mode.
    """
    rhs = compute_rhs()

    if self._rhs_prev is None:
        # Bootstrap: forward Euler for the first step
        u += dt * rhs
        self._rhs_prev = rhs.copy()
    else:
        if self._dt_prev is None:
            raise RuntimeError("AB2 missing previous dt on non-bootstrap step")
        # Variable-step AB2: ω = dt_n / dt_{n-1}
        # Use _rhs_prev as scratch (overwritten with rhs at the end).
        omega = dt / self._dt_prev
        c0 = 1.0 + 0.5 * omega
        c1 = 0.5 * omega
        self._rhs_prev *= -c1          # in-place: -c1 * F^{n-1}
        self._rhs_prev += c0 * rhs     # in-place: c0*F^n - c1*F^{n-1}
        u += dt * self._rhs_prev
        self._rhs_prev[:] = rhs

    self._dt_prev = dt
    zero_nyquist(restore_physical=True)

AM2

AM2(nx: int)

Bases: TemporalIntegrator

Adams-Bashforth 2nd-order predictor / Adams-Moulton 2nd-order corrector.

Implements a PECE predictor-corrector scheme pairing the variable-step AB2 predictor with the AM2 (trapezoidal rule) corrector:

Predict:  u^* = u^n + dt·[(1 + ω/2)·F^n − (ω/2)·F^{n-1}]   (AB2, variable-step)
Evaluate: F^* = F(u^*)
Correct:  u^{n+1} = u^n + dt/2·(F^* + F^n)                   (AM2)
Evaluate: F^{n+1} = F(u^{n+1})                               [implicit in next step]

The variable-step AB2 predictor maintains 2nd-order accuracy under CFL-based adaptive time stepping. The AM2 corrector (trapezoidal rule) uses constant coefficients since it spans only the current interval and is A-stable as a standalone implicit method.

The scheme reduces to the variable-step Euler bootstrap on the first timestep, matching the AB2 bootstrap strategy.

Stability: The AM2 corrector is A-stable. In PECE mode the effective stability region for real negative eigenvalues is slightly wider than AB2 alone, but the AB2 predictor still constrains the overall stability. The AM2 corrector's A-stability allows a larger dissipative_stability_limit (0.4) than standalone AB2 (0.1), since the corrector damps the parasitic root from the predictor.

Attributes:

Name Type Description
dissipative_stability_limit float

0.4 — the A-stable corrector permits a larger limit than standalone AB2.

_rhs_prev ndarray | None

RHS from the previous timestep (F^{n-1}), or None before the first step (triggers Euler bootstrap).

_rhs_n

Pre-allocated buffer holding F^n, copied from compute_rhs() before the corrector evaluation overwrites the shared RHS buffer.

_u_save

Pre-allocated buffer for u^n, restored before applying the corrector so the predictor modification does not persist.

_dt_prev float | None

Time step used on the previous timestep, or None before the first step.

Initialize AM2 predictor-corrector integrator.

Parameters:

Name Type Description Default
nx int

Number of grid points.

required
Source code in pyburgers/numerics/temporal/temporal_am2.py
def __init__(self, nx: int) -> None:
    """Initialize AM2 predictor-corrector integrator.

    Args:
        nx: Number of grid points.
    """
    super().__init__(nx)
    self.logger: logging.Logger = get_logger("Temporal")
    self.logger.info("--- using Adams-Moulton 2nd-order time integration")
    self._rhs_prev: np.ndarray | None = None
    self._rhs_n = np.zeros(nx)
    self._u_save = np.zeros(nx)
    self._dt_prev: float | None = None

step

step(u: ndarray, dt: float, compute_rhs: Callable[[], ndarray], zero_nyquist: Callable[[bool], None]) -> None

Advance the solution by one timestep using AB2-AM2 predictor-corrector.

Parameters:

Name Type Description Default
u ndarray

Velocity field array (modified in-place).

required
dt float

Time step size.

required
compute_rhs Callable[[], ndarray]

Callable that computes and returns the RHS vector. Returns a reference to a shared buffer; callers must copy before invoking compute_rhs a second time.

required
zero_nyquist Callable[[bool], None]

Callable that zeros the Nyquist mode. Accepts a restore_physical boolean argument.

required
Source code in pyburgers/numerics/temporal/temporal_am2.py
def step(
    self,
    u: np.ndarray,
    dt: float,
    compute_rhs: Callable[[], np.ndarray],
    zero_nyquist: Callable[[bool], None],
) -> None:
    """Advance the solution by one timestep using AB2-AM2 predictor-corrector.

    Args:
        u: Velocity field array (modified in-place).
        dt: Time step size.
        compute_rhs: Callable that computes and returns the RHS vector.
            Returns a reference to a shared buffer; callers must copy
            before invoking compute_rhs a second time.
        zero_nyquist: Callable that zeros the Nyquist mode.
            Accepts a `restore_physical` boolean argument.
    """
    # Copy F^n before a second compute_rhs call overwrites the shared buffer
    self._rhs_n[:] = compute_rhs()

    if self._rhs_prev is None:
        # Bootstrap: forward Euler for the first step
        u += dt * self._rhs_n
        self._rhs_prev = self._rhs_n.copy()
    else:
        if self._dt_prev is None:
            raise RuntimeError("AM2 missing previous dt on non-bootstrap step")
        # Save u^n; the predictor modifies u in-place
        self._u_save[:] = u

        # AB2 predictor (variable-step): ω = dt_n / dt_{n-1}
        # Reuse _rhs_prev as scratch (overwritten with F^n below).
        omega = dt / self._dt_prev
        c0 = 1.0 + 0.5 * omega
        c1 = 0.5 * omega
        self._rhs_prev *= -c1           # in-place: -c1 · F^{n-1}
        self._rhs_prev += c0 * self._rhs_n  # in-place: c0·F^n - c1·F^{n-1}
        u += dt * self._rhs_prev

        # Must restore physical space so SGS models see Nyquist-zeroed u.
        zero_nyquist(restore_physical=True)

        # Evaluate F^* at the predicted u^* (fu already current, FFT skipped)
        rhs_star = compute_rhs()

        # Zero-allocation AM2 corrector: u^{n+1} = u^n + dt/2 · (F^* + F^n)
        # Reuse _rhs_prev as scratch (consumed by predictor, overwritten below).
        u[:] = self._u_save
        self._rhs_prev[:] = rhs_star
        self._rhs_prev += self._rhs_n
        self._rhs_prev *= 0.5 * dt
        u += self._rhs_prev

        # Store F^n as F^{n-1} for the next step's AB2 predictor
        self._rhs_prev[:] = self._rhs_n

    self._dt_prev = dt
    zero_nyquist(restore_physical=True)

RK3

RK3(nx: int)

Bases: TemporalIntegrator

Williamson (1980) low-storage 3rd-order Runge-Kutta.

Uses two coefficient vectors for a three-stage integration with a single auxiliary storage array Q.

RK3's stability region for real negative eigenvalues extends to |λ dt| ~ 2.5+, so the dissipative_stability_limit of 2.5 is set well above typical max_step · π⁴ values and will not constrain the time step in practice. The limit is retained for interface consistency.

Attributes:

Name Type Description
dissipative_stability_limit float

2.5 — exceeds typical hyperviscous eigenvalue at max_step, so max_step governs instead.

Q

Low-storage register array.

Initialize RK3 integrator.

Parameters:

Name Type Description Default
nx int

Number of grid points.

required
Source code in pyburgers/numerics/temporal/temporal_rk3.py
def __init__(self, nx: int) -> None:
    """Initialize RK3 integrator.

    Args:
        nx: Number of grid points.
    """
    super().__init__(nx)
    self.logger: logging.Logger = get_logger("Temporal")
    self.logger.info("--- using Williamson RK3 time integration")
    self.Q = np.zeros(nx)

step

step(u: ndarray, dt: float, compute_rhs: Callable[[], ndarray], zero_nyquist: Callable[[bool], None]) -> None

Advance the solution by one timestep using 3-stage RK3.

Parameters:

Name Type Description Default
u ndarray

Velocity field array (modified in-place).

required
dt float

Time step size.

required
compute_rhs Callable[[], ndarray]

Callable that computes and returns the RHS vector.

required
zero_nyquist Callable[[bool], None]

Callable that zeros the Nyquist mode.

required
Source code in pyburgers/numerics/temporal/temporal_rk3.py
def step(
    self,
    u: np.ndarray,
    dt: float,
    compute_rhs: Callable[[], np.ndarray],
    zero_nyquist: Callable[[bool], None],
) -> None:
    """Advance the solution by one timestep using 3-stage RK3.

    Args:
        u: Velocity field array (modified in-place).
        dt: Time step size.
        compute_rhs: Callable that computes and returns the RHS vector.
        zero_nyquist: Callable that zeros the Nyquist mode.
    """
    for stage in range(3):
        rhs = compute_rhs()
        if stage == 0:
            self.Q.fill(0.0)           # avoid 0.0 * NaN poisoning
        else:
            self.Q *= self._A[stage]
        self.Q += rhs                  # in-place
        u += self._B[stage] * dt * self.Q

        # Zero Nyquist; must restore physical space on all stages so
        # SGS models see the Nyquist-zeroed u during intermediate stages.
        zero_nyquist(restore_physical=True)

Spatial Operators

SpatialOperator

SpatialOperator(nx: int, dx: float)

Bases: ABC

Base class for spatial discretization (gradient computation) schemes.

Provides the interface and factory method for gradient operators used by the Burgers equation solver. Subclasses implement the compute method to evaluate spatial derivatives and the zero_nyquist method to enforce spectral constraints (a no-op for finite-difference schemes).

Class attributes encode the scheme's maximum modified-wavenumber magnitudes at the Nyquist frequency, normalised by dx:

viscous_eigenvalue      max |k̃²| · dx²   (for d²/dx²)
hyperviscous_eigenvalue max |k̃⁴| · dx⁴   (for d⁴/dx⁴)

These are used by the core solver to set scheme-appropriate stability limits for the viscous and hyperviscous time step constraints, analogous to how TemporalIntegrator.dissipative_stability_limit works for the time scheme.

Attributes:

Name Type Description
nx

Number of grid points.

dx

Grid spacing.

Initialize the gradient operator.

Parameters:

Name Type Description Default
nx int

Number of grid points.

required
dx float

Grid spacing.

required
Source code in pyburgers/numerics/spatial/spatial.py
def __init__(self, nx: int, dx: float) -> None:
    """Initialize the gradient operator.

    Args:
        nx: Number of grid points.
        dx: Grid spacing.
    """
    self.nx = nx
    self.dx = dx

get_operator staticmethod

get_operator(scheme: int, nx: int, dx: float, spectral: SpectralWorkspace) -> SpatialOperator

Factory method to create the appropriate gradient operator.

Parameters:

Name Type Description Default
scheme int

Spatial discretization scheme identifier. 1 = 2nd-order central finite differences 2 = 4th-order central finite differences 3 = Spectral (Fourier collocation)

required
nx int

Number of grid points.

required
dx float

Grid spacing.

required
spectral SpectralWorkspace

SpectralWorkspace used for spectral derivatives (scheme=3) and Nyquist zeroing (all schemes).

required

Returns:

Type Description
SpatialOperator

Instance of the requested SpatialOperator subclass.

Raises:

Type Description
ValueError

If scheme is not a valid option (1-3).

Source code in pyburgers/numerics/spatial/spatial.py
@staticmethod
def get_operator(
    scheme: int,
    nx: int,
    dx: float,
    spectral: SpectralWorkspace,
) -> SpatialOperator:
    """Factory method to create the appropriate gradient operator.

    Args:
        scheme: Spatial discretization scheme identifier.
            1 = 2nd-order central finite differences
            2 = 4th-order central finite differences
            3 = Spectral (Fourier collocation)
        nx: Number of grid points.
        dx: Grid spacing.
        spectral: SpectralWorkspace used for spectral derivatives (scheme=3)
            and Nyquist zeroing (all schemes).

    Returns:
        Instance of the requested SpatialOperator subclass.

    Raises:
        ValueError: If scheme is not a valid option (1-3).
    """
    if scheme == 1:
        from .spatial_fd2 import FD2

        return FD2(nx, dx, spectral)
    if scheme == 2:
        from .spatial_fd4 import FD4

        return FD4(nx, dx, spectral)
    if scheme == 3:
        from .spatial_spectral import Spectral

        return Spectral(nx, dx, spectral)
    raise ValueError(f"Unknown spatial scheme ID: {scheme}. Valid options: 1-3.")

compute abstractmethod

compute(u: ndarray, orders: list[int | str]) -> dict[str, np.ndarray]

Compute spatial derivatives of u.

Parameters:

Name Type Description Default
u ndarray

Velocity field array.

required
orders list[int | str]

List of derivative orders to compute. Supported values: 1, 2, 3, 4 (integer derivative orders) and 'sq' (dealiased d(u²)/dx for the nonlinear advection term).

required

Returns:

Type Description
dict[str, ndarray]

Dictionary mapping order keys ('1', '2', '3', '4', 'sq') to

dict[str, ndarray]

derivative arrays.

Source code in pyburgers/numerics/spatial/spatial.py
@abstractmethod
def compute(
    self, u: np.ndarray, orders: list[int | str]
) -> dict[str, np.ndarray]:
    """Compute spatial derivatives of u.

    Args:
        u: Velocity field array.
        orders: List of derivative orders to compute. Supported values:
            1, 2, 3, 4 (integer derivative orders) and 'sq' (dealiased
            d(u²)/dx for the nonlinear advection term).

    Returns:
        Dictionary mapping order keys ('1', '2', '3', '4', 'sq') to
        derivative arrays.
    """

zero_nyquist abstractmethod

zero_nyquist(restore_physical: bool = True) -> None

Zero the Nyquist mode after each integration stage.

For spectral schemes this enforces the spectral constraint on the highest resolved wavenumber. For finite-difference schemes this is a no-op.

Parameters:

Name Type Description Default
restore_physical bool

Whether to transform back to physical space after zeroing (spectral schemes only).

True
Source code in pyburgers/numerics/spatial/spatial.py
@abstractmethod
def zero_nyquist(self, restore_physical: bool = True) -> None:
    """Zero the Nyquist mode after each integration stage.

    For spectral schemes this enforces the spectral constraint on the
    highest resolved wavenumber. For finite-difference schemes this is
    a no-op.

    Args:
        restore_physical: Whether to transform back to physical space
            after zeroing (spectral schemes only).
    """

FD2

FD2(nx: int, dx: float, spectral: SpectralWorkspace)

Bases: SpatialOperator

2nd-order central finite-difference gradient operator.

All derivatives are approximated with 2nd-order accurate central difference stencils. Periodic boundary conditions are enforced implicitly via numpy.roll. The Nyquist constraint does not apply to finite-difference schemes, so zero_nyquist is a no-op.

Modified-wavenumber maxima at Nyquist (ξ = π): viscous_eigenvalue = 4 (max |k̃²|·dx², stencil gives -4/dx²) hyperviscous_eigenvalue = 16 (max |k̃⁴|·dx⁴, stencil gives +16/dx⁴)

Stencils (h = dx): du/dx = (u[i+1] - u[i-1]) / (2h) d²u/dx² = (u[i+1] - 2u[i] + u[i-1]) / h² d³u/dx³ = (u[i+2] - 2u[i+1] + 2u[i-1] - u[i-2]) / (2h³) d⁴u/dx⁴ = (u[i+2] - 4u[i+1] + 6u[i] - 4u[i-1] + u[i-2]) / h⁴ sq = d(u²)/dx via 2nd-order FD applied to v = u²

Source code in pyburgers/numerics/spatial/spatial_fd2.py
def __init__(self, nx: int, dx: float, spectral: SpectralWorkspace) -> None:
    super().__init__(nx, dx)
    self.logger: logging.Logger = get_logger("Spatial")
    self.logger.info("--- using 2nd-order finite-difference spatial discretization")
    self._der = spectral.derivatives

    # Pre-allocated padded buffer (pad=2 on each side) and output arrays
    self._u_pad = np.empty(nx + 4, dtype=np.float64)
    self._v_sq = np.empty(nx, dtype=np.float64)
    self._v_pad = np.empty(nx + 2, dtype=np.float64)
    self._tmp = np.empty(nx, dtype=np.float64)
    self._out_1 = np.empty(nx, dtype=np.float64)
    self._out_2 = np.empty(nx, dtype=np.float64)
    self._out_3 = np.empty(nx, dtype=np.float64)
    self._out_4 = np.empty(nx, dtype=np.float64)
    self._out_sq = np.empty(nx, dtype=np.float64)

compute

compute(u: ndarray, orders: list[int | str]) -> dict[str, np.ndarray]

Compute finite-difference derivatives.

Parameters:

Name Type Description Default
u ndarray

Velocity field array.

required
orders list[int | str]

Derivative orders to compute (1, 2, 3, 4, 'sq').

required

Returns:

Type Description
dict[str, ndarray]

Dictionary of derivative arrays keyed by string order.

Source code in pyburgers/numerics/spatial/spatial_fd2.py
def compute(
    self, u: np.ndarray, orders: list[int | str]
) -> dict[str, np.ndarray]:
    """Compute finite-difference derivatives.

    Args:
        u: Velocity field array.
        orders: Derivative orders to compute (1, 2, 3, 4, 'sq').

    Returns:
        Dictionary of derivative arrays keyed by string order.
    """
    result: dict[str, np.ndarray] = {}
    h = self.dx
    n = self.nx

    # Fill pre-allocated padded buffer (periodic wrap, pad=2)
    u_pad = self._u_pad
    u_pad[2:n + 2] = u
    u_pad[0:2] = u[-2:]
    u_pad[n + 2:n + 4] = u[:2]

    um2, um1 = u_pad[0:n], u_pad[1:n + 1]
    u0 = u_pad[2:n + 2]
    up1, up2 = u_pad[3:n + 3], u_pad[4:n + 4]

    for order in orders:
        if order == 1 or order == "1":
            # (up1 - um1) / (2h)
            np.subtract(up1, um1, out=self._out_1)
            self._out_1 /= 2.0 * h
            result["1"] = self._out_1

        elif order == 2 or order == "2":
            # (up1 + um1 - 2*u0) / h²
            np.add(up1, um1, out=self._out_2)
            np.multiply(2.0, u0, out=self._tmp)
            self._out_2 -= self._tmp
            self._out_2 /= h**2
            result["2"] = self._out_2

        elif order == 3 or order == "3":
            # (up2 - um2) - 2*(up1 - um1), divided by 2h³
            np.subtract(up2, um2, out=self._out_3)
            np.subtract(up1, um1, out=self._tmp)
            self._tmp *= 2.0
            self._out_3 -= self._tmp
            self._out_3 /= 2.0 * h**3
            result["3"] = self._out_3

        elif order == 4 or order == "4":
            # (up2 + um2) - 4*(up1 + um1) + 6*u0, divided by h⁴
            np.add(up2, um2, out=self._out_4)
            np.add(up1, um1, out=self._tmp)
            self._tmp *= 4.0
            self._out_4 -= self._tmp
            np.multiply(6.0, u0, out=self._tmp)
            self._out_4 += self._tmp
            self._out_4 /= h**4
            result["4"] = self._out_4

        elif order == "sq":
            # d(u²)/dx via 2nd-order FD on pre-allocated padded v=u² buffer
            np.multiply(u, u, out=self._v_sq)
            v_pad = self._v_pad
            v_pad[1:n + 1] = self._v_sq
            v_pad[0] = self._v_sq[-1]
            v_pad[n + 1] = self._v_sq[0]
            vm1, vp1 = v_pad[0:n], v_pad[2:n + 2]
            np.subtract(vp1, vm1, out=self._out_sq)
            self._out_sq /= 2.0 * h
            result["sq"] = self._out_sq

    return result

zero_nyquist

zero_nyquist(restore_physical: bool = True) -> None

Zero the Nyquist mode via spectral transform.

FD stencils have zero advective modified wavenumber at Nyquist, so multistep integrators (AB2) can amplify that mode through the parasitic root. Zeroing it each step prevents this.

Only acts when restore_physical=True (final integrator stage), since FD derivatives need physical-space u and intermediate stages don't accumulate parasitic growth.

Parameters:

Name Type Description Default
restore_physical bool

Whether to transform back to physical space.

True
Source code in pyburgers/numerics/spatial/spatial_fd2.py
def zero_nyquist(self, restore_physical: bool = True) -> None:
    """Zero the Nyquist mode via spectral transform.

    FD stencils have zero advective modified wavenumber at Nyquist,
    so multistep integrators (AB2) can amplify that mode through
    the parasitic root. Zeroing it each step prevents this.

    Only acts when restore_physical=True (final integrator stage),
    since FD derivatives need physical-space u and intermediate
    stages don't accumulate parasitic growth.

    Args:
        restore_physical: Whether to transform back to physical space.
    """
    if restore_physical:
        self._der.zero_nyquist(restore_physical=True)

FD4

FD4(nx: int, dx: float, spectral: SpectralWorkspace)

Bases: SpatialOperator

4th-order central finite-difference gradient operator.

All derivatives are approximated with 4th-order accurate central difference stencils. Periodic boundary conditions are enforced implicitly via numpy.roll. The Nyquist constraint does not apply to finite-difference schemes, so zero_nyquist is a no-op.

Modified-wavenumber maxima at Nyquist (ξ = π): viscous_eigenvalue = 16/3 ≈ 5.33 (max |k̃²|·dx², stencil gives -64/(12dx²)) hyperviscous_eigenvalue = 80/3 ≈ 26.67 (max |k̃⁴|·dx⁴, stencil gives 160/(6dx⁴))

Stencils (h = dx): du/dx = (-u[i+2] + 8u[i+1] - 8u[i-1] + u[i-2]) / (12h) d²u/dx² = (-u[i+2] + 16u[i+1] - 30u[i] + 16u[i-1] - u[i-2]) / (12h²) d³u/dx³ = (u[i+3] - 8u[i+2] + 13u[i+1] - 13u[i-1] + 8u[i-2] - u[i-3]) / (8h³) d⁴u/dx⁴ = (-u[i+3] + 12u[i+2] - 39u[i+1] + 56u[i] - 39u[i-1] + 12u[i-2] - u[i-3]) / (6h⁴) sq = d(u²)/dx via 4th-order FD applied to v = u²

Source code in pyburgers/numerics/spatial/spatial_fd4.py
def __init__(self, nx: int, dx: float, spectral: SpectralWorkspace) -> None:
    super().__init__(nx, dx)
    self.logger: logging.Logger = get_logger("Spatial")
    self.logger.info("--- using 4th-order finite-difference spatial discretization")
    self._der = spectral.derivatives

    # Pre-allocated padded buffer (pad=3 on each side) and output arrays
    self._u_pad = np.empty(nx + 6, dtype=np.float64)
    self._v_sq = np.empty(nx, dtype=np.float64)
    self._v_pad = np.empty(nx + 4, dtype=np.float64)
    self._tmp = np.empty(nx, dtype=np.float64)
    self._out_1 = np.empty(nx, dtype=np.float64)
    self._out_2 = np.empty(nx, dtype=np.float64)
    self._out_3 = np.empty(nx, dtype=np.float64)
    self._out_4 = np.empty(nx, dtype=np.float64)
    self._out_sq = np.empty(nx, dtype=np.float64)

compute

compute(u: ndarray, orders: list[int | str]) -> dict[str, np.ndarray]

Compute finite-difference derivatives.

Parameters:

Name Type Description Default
u ndarray

Velocity field array.

required
orders list[int | str]

Derivative orders to compute (1, 2, 3, 4, 'sq').

required

Returns:

Type Description
dict[str, ndarray]

Dictionary of derivative arrays keyed by string order.

Source code in pyburgers/numerics/spatial/spatial_fd4.py
def compute(
    self, u: np.ndarray, orders: list[int | str]
) -> dict[str, np.ndarray]:
    """Compute finite-difference derivatives.

    Args:
        u: Velocity field array.
        orders: Derivative orders to compute (1, 2, 3, 4, 'sq').

    Returns:
        Dictionary of derivative arrays keyed by string order.
    """
    result: dict[str, np.ndarray] = {}
    h = self.dx
    n = self.nx

    # Fill pre-allocated padded buffer (periodic wrap, pad=3)
    u_pad = self._u_pad
    u_pad[3:n + 3] = u
    u_pad[0:3] = u[-3:]
    u_pad[n + 3:n + 6] = u[:3]

    um3, um2, um1 = u_pad[0:n], u_pad[1:n + 1], u_pad[2:n + 2]
    u0 = u_pad[3:n + 3]
    up1, up2, up3 = u_pad[4:n + 4], u_pad[5:n + 5], u_pad[6:n + 6]

    for order in orders:
        if order == 1 or order == "1":
            # 8*(up1 - um1) - (up2 - um2), divided by 12h
            np.subtract(up1, um1, out=self._out_1)
            self._out_1 *= 8.0
            np.subtract(up2, um2, out=self._tmp)
            self._out_1 -= self._tmp
            self._out_1 /= 12.0 * h
            result["1"] = self._out_1

        elif order == 2 or order == "2":
            # 16*(up1 + um1) - (up2 + um2) - 30*u0, divided by 12h²
            np.add(up1, um1, out=self._out_2)
            self._out_2 *= 16.0
            np.add(up2, um2, out=self._tmp)
            self._out_2 -= self._tmp
            np.multiply(30.0, u0, out=self._tmp)
            self._out_2 -= self._tmp
            self._out_2 /= 12.0 * h**2
            result["2"] = self._out_2

        elif order == 3 or order == "3":
            # 13*(up1 - um1) - 8*(up2 - um2) + (up3 - um3), divided by 8h³
            np.subtract(up1, um1, out=self._out_3)
            self._out_3 *= 13.0
            np.subtract(up2, um2, out=self._tmp)
            self._tmp *= 8.0
            self._out_3 -= self._tmp
            np.subtract(up3, um3, out=self._tmp)
            self._out_3 += self._tmp
            self._out_3 /= 8.0 * h**3
            result["3"] = self._out_3

        elif order == 4 or order == "4":
            # 12*(up2 + um2) - 39*(up1 + um1) - (up3 + um3) + 56*u0, divided by 6h⁴
            np.add(up2, um2, out=self._out_4)
            self._out_4 *= 12.0
            np.add(up1, um1, out=self._tmp)
            self._tmp *= 39.0
            self._out_4 -= self._tmp
            np.add(up3, um3, out=self._tmp)
            self._out_4 -= self._tmp
            np.multiply(56.0, u0, out=self._tmp)
            self._out_4 += self._tmp
            self._out_4 /= 6.0 * h**4
            result["4"] = self._out_4

        elif order == "sq":
            # d(u²)/dx via 4th-order FD on pre-allocated padded v=u² buffer
            np.multiply(u, u, out=self._v_sq)
            v_pad = self._v_pad
            v_pad[2:n + 2] = self._v_sq
            v_pad[0:2] = self._v_sq[-2:]
            v_pad[n + 2:n + 4] = self._v_sq[:2]
            vm2, vm1 = v_pad[0:n], v_pad[1:n + 1]
            vp1, vp2 = v_pad[3:n + 3], v_pad[4:n + 4]
            # 8*(vp1 - vm1) - (vp2 - vm2), divided by 12h
            np.subtract(vp1, vm1, out=self._out_sq)
            self._out_sq *= 8.0
            np.subtract(vp2, vm2, out=self._tmp)
            self._out_sq -= self._tmp
            self._out_sq /= 12.0 * h
            result["sq"] = self._out_sq

    return result

zero_nyquist

zero_nyquist(restore_physical: bool = True) -> None

Zero the Nyquist mode via spectral transform.

FD stencils have zero advective modified wavenumber at Nyquist, so multistep integrators (AB2) can amplify that mode through the parasitic root. Zeroing it each step prevents this.

Only acts when restore_physical=True (final integrator stage), since FD derivatives need physical-space u and intermediate stages don't accumulate parasitic growth.

Parameters:

Name Type Description Default
restore_physical bool

Whether to transform back to physical space.

True
Source code in pyburgers/numerics/spatial/spatial_fd4.py
def zero_nyquist(self, restore_physical: bool = True) -> None:
    """Zero the Nyquist mode via spectral transform.

    FD stencils have zero advective modified wavenumber at Nyquist,
    so multistep integrators (AB2) can amplify that mode through
    the parasitic root. Zeroing it each step prevents this.

    Only acts when restore_physical=True (final integrator stage),
    since FD derivatives need physical-space u and intermediate
    stages don't accumulate parasitic growth.

    Args:
        restore_physical: Whether to transform back to physical space.
    """
    if restore_physical:
        self._der.zero_nyquist(restore_physical=True)

Spectral

Spectral(nx: int, dx: float, spectral: SpectralWorkspace)

Bases: SpatialOperator

Spectral (Fourier collocation) gradient operator.

Delegates all derivative computation and Nyquist enforcement to the existing Derivatives object in the SpectralWorkspace. No code is duplicated; this class is a thin adapter.

Initialize the spectral gradient operator.

Parameters:

Name Type Description Default
nx int

Number of grid points.

required
dx float

Grid spacing.

required
spectral SpectralWorkspace

SpectralWorkspace whose Derivatives object is reused.

required
Source code in pyburgers/numerics/spatial/spatial_spectral.py
def __init__(self, nx: int, dx: float, spectral: SpectralWorkspace) -> None:
    """Initialize the spectral gradient operator.

    Args:
        nx: Number of grid points.
        dx: Grid spacing.
        spectral: SpectralWorkspace whose Derivatives object is reused.
    """
    super().__init__(nx, dx)
    self.logger: logging.Logger = get_logger("Spatial")
    self.logger.info("--- using spectral spatial discretization")
    self._der = spectral.derivatives

compute

compute(u: ndarray, orders: list[int | str]) -> dict[str, np.ndarray]

Compute spectral derivatives via FFT.

Parameters:

Name Type Description Default
u ndarray

Velocity field array.

required
orders list[int | str]

Derivative orders to compute (1, 2, 3, 4, 'sq').

required

Returns:

Type Description
dict[str, ndarray]

Dictionary of derivative arrays keyed by order.

Source code in pyburgers/numerics/spatial/spatial_spectral.py
def compute(
    self, u: np.ndarray, orders: list[int | str]
) -> dict[str, np.ndarray]:
    """Compute spectral derivatives via FFT.

    Args:
        u: Velocity field array.
        orders: Derivative orders to compute (1, 2, 3, 4, 'sq').

    Returns:
        Dictionary of derivative arrays keyed by order.
    """
    return self._der.compute(u, orders)

zero_nyquist

zero_nyquist(restore_physical: bool = True) -> None

Zero the Nyquist mode in spectral space.

Parameters:

Name Type Description Default
restore_physical bool

Whether to transform back to physical space.

True
Source code in pyburgers/numerics/spatial/spatial_spectral.py
def zero_nyquist(self, restore_physical: bool = True) -> None:
    """Zero the Nyquist mode in spectral space.

    Args:
        restore_physical: Whether to transform back to physical space.
    """
    self._der.zero_nyquist(restore_physical)

Data Models

data_models

Core Data Models for PyBurgers.

This module defines the core data structures used throughout PyBurgers. These structures are implemented as Python dataclasses to provide a clear and robust way to manage the model's state and configuration.

DNSConfig dataclass

DNSConfig(points: int)

Direct numerical simulation configuration.

Attributes:

Name Type Description
points int

Number of grid points.

FFTWConfig dataclass

FFTWConfig(planning: str, threads: int)

FFTW parameters.

Attributes:

Name Type Description
planning str

FFTW planning approach.

threads int

Number of threads to use.

LESConfig dataclass

LESConfig(points: int)

Large-eddy simulation configuration.

Attributes:

Name Type Description
points int

Number of grid points.

GridConfig dataclass

GridConfig(length: float, dns: DNSConfig, les: LESConfig)

Grid configurations.

Attributes:

Name Type Description
length float

Domain length (periodic) [m].

dns DNSConfig

Direct numerical simulation grid configuration.

les LESConfig

Large-eddy simulation grid configuration.

LoggingConfig dataclass

LoggingConfig(level: str, file: str | None = None)

Logging settings.

Attributes:

Name Type Description
level str

Logging level for the simulation (e.g., 'info', 'debug').

file str | None

Optional log file path for file logging.

NoiseConfig dataclass

NoiseConfig(exponent: float, amplitude: float, seed: int | None = None)

Noise method parameters.

Attributes:

Name Type Description
exponent float

FBM exponent controlling the spectral slope.

amplitude float

Noise amplitude.

seed int | None

RNG seed for reproducibility. None means a random seed.

OutputConfig dataclass

OutputConfig(interval_save: float, interval_print: float)

Output file configuration.

Attributes:

Name Type Description
interval_save float

Save interval in physical time [s].

interval_print float

Print progress interval in physical time [s].

PhysicsConfig dataclass

PhysicsConfig(noise: NoiseConfig, viscosity: float, subgrid_model: int)

Physics configuration.

Attributes:

Name Type Description
noise NoiseConfig

NoiseConfig configuration.

viscosity float

The fluid's kinematic viscosity [m^2/s].

subgrid_model int

Subgrid-scale model ID (1-4) for LES.

NumericsConfig dataclass

NumericsConfig(temporal: int, spatial: int, cfl: float, max_step: float)

Numerical method selections and time stepping parameters.

Attributes:

Name Type Description
temporal int

Time integration scheme ID (1=AB2, 2=AM2, 3=RK3).

spatial int

Spatial discretization scheme ID (1=FD2, 2=FD4, 3=Spectral).

cfl float

Target CFL number for adaptive time stepping.

max_step float

Maximum allowed time step [s].

TimeConfig dataclass

TimeConfig(duration: float)

Time-related parameters for the simulation.

Attributes:

Name Type Description
duration float

Total simulation time [s].

Exceptions

exceptions

Custom exception types for PyBurgers.

PyBurgersError

Bases: Exception

Base class for all custom exceptions in PyBurgers.

NamelistError

Bases: PyBurgersError

Raised for errors found in the namelist configuration.

InvalidMode

Bases: PyBurgersError

Raised when an invalid simulation mode is specified.

Utilities

Derivatives

Derivatives(nx: int, dx: float, fftw_planning: str = 'FFTW_MEASURE', fftw_threads: int = 1)

Computes spectral derivatives using real FFT (rfft/irfft).

Uses Fourier collocation to compute spatial derivatives of a field. Supports first, second, and third order derivatives, as well as the dealiased derivative of the squared field (d(u^2)/dx).

Since velocity fields are real-valued, rfft is used for efficiency (~2x speedup, ~50% memory reduction for frequency arrays).

Attributes:

Name Type Description
nx

Number of grid points.

dx

Grid spacing.

nk

Number of rfft output coefficients (nx//2 + 1).

m

Nyquist mode index (nx/2).

fac

Derivative scaling factor (2pi/(nxdx)).

k

Wavenumber array (non-negative frequencies only).

Initialize the Derivatives calculator.

Parameters:

Name Type Description Default
nx int

Number of grid points (must be even).

required
dx float

Grid spacing.

required
fftw_planning str

FFTW planning strategy.

'FFTW_MEASURE'
fftw_threads int

Number of threads for FFTW.

1
Source code in pyburgers/utils/spectral.py
def __init__(
    self, nx: int, dx: float, fftw_planning: str = "FFTW_MEASURE", fftw_threads: int = 1
) -> None:
    """Initialize the Derivatives calculator.

    Args:
        nx: Number of grid points (must be even).
        dx: Grid spacing.
        fftw_planning: FFTW planning strategy.
        fftw_threads: Number of threads for FFTW.
    """
    self.nx = nx
    self.dx = dx

    # computed values
    self.nk = self.nx // 2 + 1  # rfft output size
    self.m = self.nx // 2  # Nyquist index
    self.fac = 2 * np.pi / (self.nx * self.dx)

    # wavenumber array for rfft (non-negative frequencies only)
    self.k = np.fft.rfftfreq(self.nx, d=1 / self.nx)
    self.k[self.nk - 1] = 0  # Zero Nyquist mode (last element)

    # Precompute powers for efficiency
    self.fac2 = self.fac**2
    self.fac3 = self.fac**3
    self.fac4 = self.fac**4
    self.k2 = self.k * self.k
    self.k3 = self.k**3
    self.k4 = self.k**4

    # pyfftw arrays for real FFT
    # Physical space: float64, Frequency space: complex128
    self.u = pyfftw.empty_aligned(nx, np.float64)
    self.fu = pyfftw.empty_aligned(self.nk, np.complex128)
    self.fun = pyfftw.empty_aligned(self.nk, np.complex128)
    self.der = pyfftw.empty_aligned(nx, np.float64)

    # Pre-allocated output arrays for derivatives (reused across calls)
    self._out_1 = pyfftw.empty_aligned(nx, np.float64)
    self._out_2 = pyfftw.empty_aligned(nx, np.float64)
    self._out_3 = pyfftw.empty_aligned(nx, np.float64)
    self._out_4 = pyfftw.empty_aligned(nx, np.float64)
    self._out_sq = pyfftw.empty_aligned(nx, np.float64)

    # Separate forward-FFT buffers for external-array derivatives
    # (e.g., SGS stress tau, subgrid TKE). Avoids save/restore of the
    # primary velocity buffers (self.u / self.fu).
    self._ext_u = pyfftw.empty_aligned(nx, np.float64)
    self._ext_fu = pyfftw.empty_aligned(self.nk, np.complex128)
    self._ext_fft = pyfftw.FFTW(
        self._ext_u, self._ext_fu,
        direction="FFTW_FORWARD", flags=(fftw_planning,), threads=fftw_threads,
    )

    # padded pyfftw arrays for 3/2-rule dealiasing
    nx_padded = 3 * self.nx // 2
    nk_padded = nx_padded // 2 + 1
    self.up = pyfftw.empty_aligned(nx_padded, np.float64)
    self.fup = pyfftw.empty_aligned(nk_padded, np.complex128)

    # pyfftw functions (auto-detects real<->complex from dtypes)
    self.fft = pyfftw.FFTW(
        self.u, self.fu, direction="FFTW_FORWARD", flags=(fftw_planning,), threads=fftw_threads
    )

    self.ifft = pyfftw.FFTW(
        self.fun,
        self.der,
        direction="FFTW_BACKWARD",
        flags=(fftw_planning,),
        threads=fftw_threads,
    )

    # Inverse FFT for Nyquist zeroing: fu -> u
    # Used by core.py to zero the Nyquist mode and transform back
    self.ifft_nyquist = pyfftw.FFTW(
        self.fu, self.u, direction="FFTW_BACKWARD", flags=(fftw_planning,), threads=fftw_threads
    )

    self.fftp = pyfftw.FFTW(
        self.up,
        self.fup,
        direction="FFTW_FORWARD",
        flags=(fftw_planning,),
        threads=fftw_threads,
    )

    self.ifftp = pyfftw.FFTW(
        self.fup,
        self.up,
        direction="FFTW_BACKWARD",
        flags=(fftw_planning,),
        threads=fftw_threads,
    )

compute

compute(u: ndarray, orders: list[int | str]) -> dict[str, np.ndarray]

Compute spectral derivatives of the input field.

When u is not the internal velocity buffer (self.u), a separate FFT buffer pair is used so the primary velocity state is never touched.

Parameters:

Name Type Description Default
u ndarray

Input field array (real-valued).

required
orders list[int | str]

List of derivative orders to compute. Can include integers (1, 2, 3, 4) for standard derivatives or 'sq' for the dealiased derivative of u^2.

required

Returns:

Type Description
dict[str, ndarray]

Dictionary mapping order keys ('1', '2', '3', '4', 'sq') to

dict[str, ndarray]

the corresponding derivative arrays. Arrays are reused

dict[str, ndarray]

internally, so callers should consume values before the

dict[str, ndarray]

next compute() call.

Source code in pyburgers/utils/spectral.py
def compute(self, u: np.ndarray, orders: list[int | str]) -> dict[str, np.ndarray]:
    """Compute spectral derivatives of the input field.

    When ``u`` is not the internal velocity buffer (``self.u``),
    a separate FFT buffer pair is used so the primary velocity
    state is never touched.

    Args:
        u: Input field array (real-valued).
        orders: List of derivative orders to compute. Can include
            integers (1, 2, 3, 4) for standard derivatives or 'sq'
            for the dealiased derivative of u^2.

    Returns:
        Dictionary mapping order keys ('1', '2', '3', '4', 'sq') to
        the corresponding derivative arrays. Arrays are reused
        internally, so callers should consume values before the
        next compute() call.
    """
    derivatives = {}

    # For external arrays (e.g., tau, tke_sgs), use the separate
    # FFT buffers so the primary velocity state is undisturbed.
    if u is not self.u:
        self._ext_u[:] = u
        self._ext_fft()
        fu = self._ext_fu
    else:
        self.u[:] = u
        self.fft()
        fu = self.fu

    # Loop through requested derivative orders
    for key in orders:
        if key == 1 or key == "1":
            np.multiply(self.k, fu, out=self.fun)
            self.fun *= 1j
            self.ifft()
            np.multiply(self.fac, self.der, out=self._out_1)
            derivatives["1"] = self._out_1
        elif key == 2 or key == "2":
            np.multiply(self.k2, fu, out=self.fun)
            self.fun *= -1
            self.ifft()
            np.multiply(self.fac2, self.der, out=self._out_2)
            derivatives["2"] = self._out_2
        elif key == 3 or key == "3":
            np.multiply(self.k3, fu, out=self.fun)
            self.fun *= -1j
            self.ifft()
            np.multiply(self.fac3, self.der, out=self._out_3)
            derivatives["3"] = self._out_3
        elif key == 4 or key == "4":
            np.multiply(self.k4, fu, out=self.fun)
            self.ifft()
            np.multiply(self.fac4, self.der, out=self._out_4)
            derivatives["4"] = self._out_4
        elif key == "sq":
            # Dealiased computation of d(u^2)/dx using 3/2-rule zero-padding
            # With rfft, only non-negative frequencies are stored
            self.fup[:] = 0
            self.fup[0 : self.nk] = fu
            # Transform to padded physical space
            self.ifftp()
            # Correct for 3/2 padding normalization: irfft divides by 3n/2 instead of n
            self.up *= 1.5
            # Square in physical space
            np.square(self.up, out=self.up)
            # Transform back to spectral space
            self.fftp()
            # Truncate to original modes and differentiate; self.k zeros Nyquist
            np.multiply(self.k, self.fup[0 : self.nk], out=self.fun)
            self.fun *= (1j / 1.5)
            self.ifft()
            np.multiply(self.fac, self.der, out=self._out_sq)
            derivatives["sq"] = self._out_sq

    return derivatives

zero_nyquist

zero_nyquist(restore_physical: bool = True) -> None

FFT self.u, zero the Nyquist mode, and restore physical space.

Called after each integration stage to enforce the Nyquist constraint. Performs FFT, zeros the Nyquist mode, and IFFTs back to physical space.

Parameters:

Name Type Description Default
restore_physical bool

Ignored; always restores physical space after zeroing. Retained for interface compatibility with SpatialOperator.

True
Source code in pyburgers/utils/spectral.py
def zero_nyquist(self, restore_physical: bool = True) -> None:
    """FFT self.u, zero the Nyquist mode, and restore physical space.

    Called after each integration stage to enforce the Nyquist constraint.
    Performs FFT, zeros the Nyquist mode, and IFFTs back to physical space.

    Args:
        restore_physical: Ignored; always restores physical space after
            zeroing. Retained for interface compatibility with SpatialOperator.
    """
    self.fft()
    self.fu[self.m] = 0
    self.ifft_nyquist()

Dealias

Dealias(nx: int, fftw_planning: str = 'FFTW_MEASURE', fftw_threads: int = 1, *, shared_der: Derivatives | None = None)

Dealiases nonlinear products using the 3/2 rule.

Computes |x| * x with proper dealiasing to avoid aliasing errors in the nonlinear term of the Burgers equation.

Uses real FFT (rfft/irfft) for efficiency since input fields are real-valued.

Attributes:

Name Type Description
nx

Number of grid points.

nk

Number of rfft output coefficients (nx//2 + 1).

m

Nyquist mode index (nx/2).

Initialize the Dealias calculator.

Parameters:

Name Type Description Default
nx int

Number of grid points.

required
fftw_planning str

FFTW planning strategy.

'FFTW_MEASURE'
fftw_threads int

Number of threads for FFTW.

1
shared_der Derivatives | None

Optional Derivatives instance whose padded FFT plans and buffers are reused instead of allocating new ones. When provided, the padded-forward and padded-inverse plans are shared. Callers must ensure Dealias.compute() and Derivatives.compute('sq') are not called concurrently.

None
Source code in pyburgers/utils/spectral.py
def __init__(
    self,
    nx: int,
    fftw_planning: str = "FFTW_MEASURE",
    fftw_threads: int = 1,
    *,
    shared_der: Derivatives | None = None,
) -> None:
    """Initialize the Dealias calculator.

    Args:
        nx: Number of grid points.
        fftw_planning: FFTW planning strategy.
        fftw_threads: Number of threads for FFTW.
        shared_der: Optional Derivatives instance whose padded FFT plans
            and buffers are reused instead of allocating new ones. When
            provided, the padded-forward and padded-inverse plans are
            shared. Callers must ensure Dealias.compute() and
            Derivatives.compute('sq') are not called concurrently.
    """
    self.nx = nx
    self.m = self.nx // 2
    self.nk = self.nx // 2 + 1  # rfft output size

    # 3/2 rule padding sizes
    nx_padded = 3 * self.m  # = 3/2 * nx
    nk_padded = nx_padded // 2 + 1

    # pyfftw arrays for real FFT
    self.x = pyfftw.empty_aligned(self.nx, np.float64)
    self.fx = pyfftw.empty_aligned(self.nk, np.complex128)

    # temp is Dealias-private scratch space — always allocated
    self.temp = pyfftw.empty_aligned(nx_padded, np.float64)

    # pyfftw functions (auto-detects real<->complex from dtypes)
    self.fft = pyfftw.FFTW(
        self.x, self.fx, direction="FFTW_FORWARD", flags=(fftw_planning,), threads=fftw_threads
    )

    self.ifft = pyfftw.FFTW(
        self.fx, self.x, direction="FFTW_BACKWARD", flags=(fftw_planning,), threads=fftw_threads
    )

    if shared_der is not None:
        # Reuse padded buffers and plans from the Derivatives instance
        self.xp = shared_der.up
        self.fxp = shared_der.fup
        self.fftp = shared_der.fftp
        self.ifftp = shared_der.ifftp
    else:
        # Allocate own padded buffers and plans
        self.xp = pyfftw.empty_aligned(nx_padded, np.float64)
        self.fxp = pyfftw.empty_aligned(nk_padded, np.complex128)

        self.fftp = pyfftw.FFTW(
            self.xp,
            self.fxp,
            direction="FFTW_FORWARD",
            flags=(fftw_planning,),
            threads=fftw_threads,
        )

        self.ifftp = pyfftw.FFTW(
            self.fxp,
            self.xp,
            direction="FFTW_BACKWARD",
            flags=(fftw_planning,),
            threads=fftw_threads,
        )

compute

compute(x: ndarray) -> np.ndarray

Compute the dealiased product |x| * x.

Parameters:

Name Type Description Default
x ndarray

Input array (real-valued).

required

Returns:

Type Description
ndarray

Dealiased result of |x| * x. The returned array is an

ndarray

internal buffer; copy it if you need to preserve the values

ndarray

across calls.

Source code in pyburgers/utils/spectral.py
def compute(self, x: np.ndarray) -> np.ndarray:
    """Compute the dealiased product |x| * x.

    Args:
        x: Input array (real-valued).

    Returns:
        Dealiased result of |x| * x. The returned array is an
        internal buffer; copy it if you need to preserve the values
        across calls.
    """
    scale = c.spectral.DEALIAS_SCALE

    self.x[:] = x
    self.fft()

    # Zero-pad fx
    self.fxp[:] = 0
    self.fxp[0 : self.nk] = self.fx
    self.ifftp()

    # Store padded x in temp
    self.temp[:] = self.xp

    # Compute padded |x|
    np.abs(x, out=self.x)
    self.fft()
    self.fxp[:] = 0
    self.fxp[0 : self.nk] = self.fx
    self.ifftp()

    # Multiply x * |x| in padded space
    np.multiply(self.xp, self.temp, out=self.xp)

    # Transform back and de-alias
    self.fftp()
    self.fx[:] = self.fxp[0 : self.nk]
    self.ifft()

    self.x *= scale
    return self.x

Filter

Filter(nx: int, nx2: int | None = None, fftw_planning: str = 'FFTW_MEASURE', fftw_threads: int = 1)

Spectral filtering operations.

Provides spectral cutoff filtering and downscaling from DNS to LES resolution using Fourier methods.

Uses real FFT (rfft/irfft) for efficiency since input fields are real-valued.

Attributes:

Name Type Description
nx

Number of grid points for the filtered field.

nk

Number of rfft output coefficients (nx//2 + 1).

nx2

Number of grid points for the source field (DNS resolution).

Initialize the Filter.

Parameters:

Name Type Description Default
nx int

Number of grid points for the target (filtered) field.

required
nx2 int | None

Optional number of grid points for source field (used for downscaling from DNS to LES).

None
fftw_planning str

FFTW planning strategy.

'FFTW_MEASURE'
fftw_threads int

Number of threads for FFTW.

1
Source code in pyburgers/utils/spectral.py
def __init__(
    self,
    nx: int,
    nx2: int | None = None,
    fftw_planning: str = "FFTW_MEASURE",
    fftw_threads: int = 1,
) -> None:
    """Initialize the Filter.

    Args:
        nx: Number of grid points for the target (filtered) field.
        nx2: Optional number of grid points for source field
            (used for downscaling from DNS to LES).
        fftw_planning: FFTW planning strategy.
        fftw_threads: Number of threads for FFTW.
    """
    self.nx = nx
    self.nk = self.nx // 2 + 1  # rfft output size

    # pyfftw arrays for real FFT
    self.x = pyfftw.empty_aligned(self.nx, np.float64)
    self.fx = pyfftw.empty_aligned(self.nk, np.complex128)
    self.fxf = pyfftw.zeros_aligned(self.nk, np.complex128)

    # pyfftw functions (auto-detects real<->complex from dtypes)
    self.fft = pyfftw.FFTW(
        self.x, self.fx, direction="FFTW_FORWARD", flags=(fftw_planning,), threads=fftw_threads
    )

    self.ifft = pyfftw.FFTW(
        self.fxf,
        self.x,
        direction="FFTW_BACKWARD",
        flags=(fftw_planning,),
        threads=fftw_threads,
    )

    # check for optional larger nx (for downscaling from DNS->LES)
    if nx2:
        self.nx2 = nx2
        self.nk2 = self.nx2 // 2 + 1

        # pyfftw arrays for larger grid
        self.x2 = pyfftw.empty_aligned(self.nx2, np.float64)
        self.fx2 = pyfftw.empty_aligned(self.nk2, np.complex128)

        # pyfftw function for larger grid
        self.fft2 = pyfftw.FFTW(
            self.x2,
            self.fx2,
            direction="FFTW_FORWARD",
            flags=(fftw_planning,),
            threads=fftw_threads,
        )

cutoff

cutoff(x: ndarray, ratio: int, out: ndarray | None = None) -> np.ndarray

Apply a spectral cutoff filter.

Removes high-frequency modes above nx/ratio.

Parameters:

Name Type Description Default
x ndarray

Input array to filter (real-valued).

required
ratio int

Cutoff ratio (keeps modes up to nx/ratio).

required
out ndarray | None

Optional pre-allocated output array. If provided, the result is written here instead of allocating a new array.

None

Returns:

Type Description
ndarray

Filtered array with high frequencies removed.

Source code in pyburgers/utils/spectral.py
def cutoff(
    self, x: np.ndarray, ratio: int, out: np.ndarray | None = None
) -> np.ndarray:
    """Apply a spectral cutoff filter.

    Removes high-frequency modes above nx/ratio.

    Args:
        x: Input array to filter (real-valued).
        ratio: Cutoff ratio (keeps modes up to nx/ratio).
        out: Optional pre-allocated output array. If provided, the
            result is written here instead of allocating a new array.

    Returns:
        Filtered array with high frequencies removed.
    """
    # signal size information
    m = int(self.nx / ratio)
    half = int(m / 2)

    # copy input array
    self.x[:] = x

    # compute rfft of x
    self.fft()

    # filter fx (keep low frequencies only)
    # With rfft, only non-negative frequencies exist
    self.fxf[:] = 0
    self.fxf[0:half] = self.fx[0:half]

    # compute irfft of fxf
    self.ifft()

    # return filtered x
    if out is not None:
        out[:] = self.x
        return out
    return self.x.copy()

downscale

downscale(x: ndarray, ratio: int, out: ndarray | None = None) -> np.ndarray

Downscale a field from DNS to LES resolution.

Uses Fourier filtering to project a high-resolution field onto a coarser grid while preserving low-frequency content.

Parameters:

Name Type Description Default
x ndarray

Input array at DNS resolution (real-valued).

required
ratio int

Downscaling ratio (nx2 / nx).

required
out ndarray | None

Optional pre-allocated output array. If provided, the result is written here instead of allocating a new array.

None

Returns:

Type Description
ndarray

Downscaled array at LES resolution.

Source code in pyburgers/utils/spectral.py
def downscale(
    self, x: np.ndarray, ratio: int, out: np.ndarray | None = None
) -> np.ndarray:
    """Downscale a field from DNS to LES resolution.

    Uses Fourier filtering to project a high-resolution field
    onto a coarser grid while preserving low-frequency content.

    Args:
        x: Input array at DNS resolution (real-valued).
        ratio: Downscaling ratio (nx2 / nx).
        out: Optional pre-allocated output array. If provided, the
            result is written here instead of allocating a new array.

    Returns:
        Downscaled array at LES resolution.
    """
    # zero output array to prevent stale data
    self.fxf[:] = 0

    # copy input array
    self.x2[:] = x

    # signal shape information - keep up to (but not including) LES Nyquist
    half = self.nx // 2

    # compute rfft of larger series
    self.fft2()

    # filter - transfer low frequencies to smaller array
    # With rfft, only non-negative frequencies exist
    self.fxf[0:half] = self.fx2[0:half]
    self.fxf[half] = 0  # Zero Nyquist

    # compute the irfft
    self.ifft()

    # return filtered downscaled field
    # Scale by 1/ratio to preserve amplitude when downscaling
    inv_ratio = 1.0 / ratio
    if out is not None:
        np.multiply(inv_ratio, self.x, out=out)
        return out
    return inv_ratio * self.x.copy()

SpectralWorkspace

SpectralWorkspace(nx: int, dx: float, nx2: int | None = None, noise_beta: float | None = None, noise_nx: int | None = None, fftw_planning: str = 'FFTW_MEASURE', fftw_threads: int = 1, rng: Generator | None = None)

Centralized workspace for all spectral operations.

Manages FFT plans and aligned buffers for a simulation by bundling Derivatives, Dealias, Filter, and FBM noise utilities into a single workspace. All utilities share consistent FFTW settings (planning strategy, threads).

This design eliminates redundant FFT plan creation and ensures that resources are shared efficiently across the simulation.

Attributes:

Name Type Description
derivatives

Derivatives calculator for spatial derivatives.

dealias

Dealias calculator for nonlinear products.

filter

Filter for spectral cutoff and downscaling.

noise

FBM noise generator (only if noise_beta provided).

u ndarray

Primary velocity buffer (float64, size nx).

fu ndarray

Primary Fourier space buffer (complex128, size nx//2+1 for rfft).

Example

DNS workspace with noise

workspace = SpectralWorkspace(nx=512, dx=0.01, noise_beta=-0.75) derivs = workspace.derivatives.compute(u, order=[1, 2]) noise = workspace.noise.compute_noise()

LES workspace with filtering and noise at DNS resolution

workspace_les = SpectralWorkspace( ... nx=512, dx=0.01, nx2=8192, noise_beta=-0.75, noise_nx=8192 ... ) filtered = workspace_les.filter.cutoff(x, ratio=2)

Initialize the spectral workspace.

Parameters:

Name Type Description Default
nx int

Number of grid points for the simulation.

required
dx float

Grid spacing.

required
nx2 int | None

Optional number of grid points for DNS resolution (used in LES mode for downscaling noise from DNS to LES grid). If provided, creates Filter with downscaling capability.

None
noise_beta float | None

Optional FBM exponent for noise generation. If provided, creates FBM noise generator. Typical value is 0.75.

None
noise_nx int | None

Optional number of grid points for noise generation. If not provided and noise_beta is given, uses nx. For LES, set this to nx_dns to generate noise at DNS resolution.

None
fftw_planning str

FFTW planning strategy. Options: - 'FFTW_ESTIMATE': Fast planning, slower execution - 'FFTW_MEASURE': Balanced (default) - 'FFTW_PATIENT': Slow planning, faster execution

'FFTW_MEASURE'
fftw_threads int

Number of threads for FFTW operations.

1
Source code in pyburgers/utils/spectral_workspace.py
def __init__(
    self,
    nx: int,
    dx: float,
    nx2: int | None = None,
    noise_beta: float | None = None,
    noise_nx: int | None = None,
    fftw_planning: str = "FFTW_MEASURE",
    fftw_threads: int = 1,
    rng: np.random.Generator | None = None,
) -> None:
    """Initialize the spectral workspace.

    Args:
        nx: Number of grid points for the simulation.
        dx: Grid spacing.
        nx2: Optional number of grid points for DNS resolution
            (used in LES mode for downscaling noise from DNS to LES grid).
            If provided, creates Filter with downscaling capability.
        noise_beta: Optional FBM exponent for noise generation.
            If provided, creates FBM noise generator. Typical value is 0.75.
        noise_nx: Optional number of grid points for noise generation.
            If not provided and noise_beta is given, uses nx.
            For LES, set this to nx_dns to generate noise at DNS resolution.
        fftw_planning: FFTW planning strategy. Options:
            - 'FFTW_ESTIMATE': Fast planning, slower execution
            - 'FFTW_MEASURE': Balanced (default)
            - 'FFTW_PATIENT': Slow planning, faster execution
        fftw_threads: Number of threads for FFTW operations.
    """
    # Store configuration
    self.nx = nx
    self.dx = dx
    self.nx2 = nx2
    self.noise_beta = noise_beta
    self.noise_nx = noise_nx if noise_nx is not None else nx
    self.fftw_planning = fftw_planning
    self.fftw_threads = fftw_threads

    # Initialize all spectral utilities with consistent settings
    self.derivatives = Derivatives(
        nx=nx, dx=dx, fftw_planning=fftw_planning, fftw_threads=fftw_threads
    )

    # Dealias and Filter are only needed in LES mode (SGS models).
    # Skip creation in DNS mode to save memory and plan-building time.
    if nx2 is not None:
        self.dealias = Dealias(
            nx=nx,
            fftw_planning=fftw_planning,
            fftw_threads=fftw_threads,
            shared_der=self.derivatives,
        )
        self.filter = Filter(
            nx=nx,
            nx2=nx2,
            fftw_planning=fftw_planning,
            fftw_threads=fftw_threads,
        )
    else:
        self.dealias = None
        self.filter = None

    # Optionally create FBM noise generator
    # If noise_nx differs from nx (LES case), noise is generated at noise_nx
    # resolution and must be filtered down using self.filter.downscale()
    if noise_beta is not None:
        self.noise = FBM(
            beta=noise_beta,
            n_pts=self.noise_nx,
            fftw_planning=fftw_planning,
            fftw_threads=fftw_threads,
            rng=rng,
        )
    else:
        self.noise = None

    # Expose commonly used buffers for direct access
    # This allows code like: workspace.u[:] = initial_condition
    self.u: np.ndarray = self.derivatives.u
    self.fu: np.ndarray = self.derivatives.fu

__repr__

__repr__() -> str

Return a string representation of the workspace.

Source code in pyburgers/utils/spectral_workspace.py
def __repr__(self) -> str:
    """Return a string representation of the workspace."""
    filter_info = f", nx2={self.nx2}" if self.nx2 else ""
    noise_info = (
        f", noise_beta={self.noise_beta}, noise_nx={self.noise_nx}"
        if self.noise_beta is not None
        else ""
    )
    return (
        f"SpectralWorkspace(nx={self.nx}, dx={self.dx}{filter_info}{noise_info}, "
        f"fftw_planning='{self.fftw_planning}', fftw_threads={self.fftw_threads})"
    )

fftw

FFTW wisdom management for PyBurgers.

This module handles loading and saving FFTW wisdom to disk, which allows FFT plans to be reused across runs for faster initialization.

The wisdom file is stored at ~/.pyburgers_fftw_wisdom.

File locking is used to prevent race conditions when multiple PyBurgers instances access the wisdom file concurrently.

load_wisdom

load_wisdom() -> tuple[bool, str]

Load accumulated FFTW wisdom from cache file.

FFTW wisdom is cumulative: a single file can hold optimal plans for many different transform sizes. This function imports whatever plans are present; pyfftw will use matching plans and re-plan any missing sizes on-demand.

No parameter validation is performed — grid sizes and physical parameters do not affect whether a stored plan is valid.

Uses file locking to prevent concurrent access conflicts.

Returns:

Type Description
tuple[bool, str]

Tuple of (success: bool, message: str).

Source code in pyburgers/utils/fftw.py
def load_wisdom() -> tuple[bool, str]:
    """Load accumulated FFTW wisdom from cache file.

    FFTW wisdom is cumulative: a single file can hold optimal plans for
    many different transform sizes. This function imports whatever plans
    are present; pyfftw will use matching plans and re-plan any missing
    sizes on-demand.

    No parameter validation is performed — grid sizes and physical
    parameters do not affect whether a stored plan is valid.

    Uses file locking to prevent concurrent access conflicts.

    Returns:
        Tuple of (success: bool, message: str).
    """
    if not WISDOM_FILE.exists():
        return False, "No wisdom file found"

    try:
        with FileLock(str(WISDOM_FILE) + ".lock", timeout=LOCK_TIMEOUT):
            with open(WISDOM_FILE, "rb") as f:
                data = pickle.load(f)

        # Support both legacy dict format and current raw wisdom format
        wisdom = data.get("wisdom") if isinstance(data, dict) else data
        if wisdom is None:
            return False, "No wisdom data in file"

        pyfftw.import_wisdom(wisdom)
        return True, "Wisdom loaded successfully"

    except Timeout:
        return False, f"Lock timeout after {LOCK_TIMEOUT}s"
    except Exception as e:
        return False, f"Error loading wisdom: {e}"

save_wisdom

save_wisdom() -> bool

Save accumulated FFTW wisdom to cache file.

Exports all plans that have been created in this session and writes them to the cache file, merging with any plans already present. Plans accumulate across runs so that each new configuration adds to the file rather than replacing it.

Uses file locking to prevent concurrent write conflicts.

Returns:

Type Description
bool

True if wisdom was saved successfully, False otherwise.

Source code in pyburgers/utils/fftw.py
def save_wisdom() -> bool:
    """Save accumulated FFTW wisdom to cache file.

    Exports all plans that have been created in this session and writes
    them to the cache file, merging with any plans already present.
    Plans accumulate across runs so that each new configuration adds to
    the file rather than replacing it.

    Uses file locking to prevent concurrent write conflicts.

    Returns:
        True if wisdom was saved successfully, False otherwise.
    """
    try:
        with FileLock(str(WISDOM_FILE) + ".lock", timeout=LOCK_TIMEOUT):
            with open(WISDOM_FILE, "wb") as f:
                pickle.dump(pyfftw.export_wisdom(), f)
        return True
    except Timeout:
        return False
    except Exception:
        return False

warmup_fftw_plans

warmup_fftw_plans(nx_dns: int, nx_les: int, noise_beta: float, fftw_planning: str, fftw_threads: int, domain_length: float = 2 * 3.141592653589793) -> tuple[bool, str]

Generate FFTW plans for common PyBurgers sizes.

This creates representative FFTW plans for DNS/LES grids, filters, dealiasing, and FBM noise so wisdom can be saved once and reused.

If warmup fails (e.g., out of memory, invalid parameters), the function returns False and a descriptive error message. The simulation can still continue - plans will be created on-demand during initialization.

Parameters:

Name Type Description Default
nx_dns int

DNS grid resolution.

required
nx_les int

LES grid resolution.

required
noise_beta float

FBM noise exponent.

required
fftw_planning str

FFTW planning strategy.

required
fftw_threads int

Number of FFTW threads.

required
domain_length float

Length of the periodic domain (default: 2π).

2 * 3.141592653589793

Returns:

Type Description
bool

Tuple of (success: bool, message: str) indicating whether warmup

str

completed successfully and any relevant diagnostic information.

Source code in pyburgers/utils/fftw.py
def warmup_fftw_plans(
    nx_dns: int,
    nx_les: int,
    noise_beta: float,
    fftw_planning: str,
    fftw_threads: int,
    domain_length: float = 2 * 3.141592653589793,
) -> tuple[bool, str]:
    """Generate FFTW plans for common PyBurgers sizes.

    This creates representative FFTW plans for DNS/LES grids, filters,
    dealiasing, and FBM noise so wisdom can be saved once and reused.

    If warmup fails (e.g., out of memory, invalid parameters), the function
    returns False and a descriptive error message. The simulation can still
    continue - plans will be created on-demand during initialization.

    Args:
        nx_dns: DNS grid resolution.
        nx_les: LES grid resolution.
        noise_beta: FBM noise exponent.
        fftw_planning: FFTW planning strategy.
        fftw_threads: Number of FFTW threads.
        domain_length: Length of the periodic domain (default: 2π).

    Returns:
        Tuple of (success: bool, message: str) indicating whether warmup
        completed successfully and any relevant diagnostic information.
    """
    from .spectral_workspace import SpectralWorkspace

    try:
        # Warm DNS components using SpectralWorkspace
        if nx_dns > 0:
            try:
                SpectralWorkspace(
                    nx=nx_dns,
                    dx=domain_length / nx_dns,
                    noise_beta=noise_beta,
                    noise_nx=nx_dns,
                    fftw_planning=fftw_planning,
                    fftw_threads=fftw_threads,
                )
            except Exception as e:
                raise RuntimeError(f"Failed to create DNS workspace (nx={nx_dns}): {e}") from e

        # Warm LES components using SpectralWorkspace
        if nx_les > 0:
            try:
                SpectralWorkspace(
                    nx=nx_les,
                    dx=domain_length / nx_les,
                    nx2=nx_dns,
                    noise_beta=noise_beta,
                    noise_nx=nx_dns,
                    fftw_planning=fftw_planning,
                    fftw_threads=fftw_threads,
                )
            except Exception as e:
                raise RuntimeError(
                    f"Failed to create LES workspace (nx={nx_les}, nx2={nx_dns}): {e}"
                ) from e

        return True, "FFTW plans created successfully"

    except Exception as e:
        # Warmup failed, but this is not fatal - plans will be created on demand
        return False, f"Warmup failed: {e}"

FBM

FBM(beta: float, n_pts: int, fftw_planning: str = 'FFTW_MEASURE', fftw_threads: int = 1, rng: Generator | None = None)

Generates fractional Brownian motion (FBM) noise.

FBM noise is used as the stochastic forcing term in the Burgers equation. The noise has a power spectrum that scales as k^(-beta).

Attributes:

Name Type Description
beta

FBM exponent, controls spectral slope.

n_pts

Number of grid points.

fftw_planning

FFTW planning strategy.

fftw_threads

Number of FFTW threads.

nyquist

Nyquist mode index (n/2).

wavenumber

Wavenumber array for spectral coloring.

Initialize the FBM noise generator.

Parameters:

Name Type Description Default
beta float

FBM exponent controlling the spectral slope. Typical value is -0.75 for Burgers turbulence.

required
n_pts int

Number of grid points.

required
fftw_planning str

FFTW planning strategy (default: 'FFTW_MEASURE').

'FFTW_MEASURE'
fftw_threads int

Number of FFTW threads (default: 1).

1
rng Generator | None

NumPy Generator for reproducible noise. If None, a fresh default_rng() is created (non-reproducible).

None
Source code in pyburgers/utils/fbm.py
def __init__(
    self,
    beta: float,
    n_pts: int,
    fftw_planning: str = "FFTW_MEASURE",
    fftw_threads: int = 1,
    rng: np.random.Generator | None = None,
) -> None:
    """Initialize the FBM noise generator.

    Args:
        beta: FBM exponent controlling the spectral slope. Typical value
            is -0.75 for Burgers turbulence.
        n_pts: Number of grid points.
        fftw_planning: FFTW planning strategy (default: 'FFTW_MEASURE').
        fftw_threads: Number of FFTW threads (default: 1).
        rng: NumPy Generator for reproducible noise. If None, a fresh
            default_rng() is created (non-reproducible).
    """
    self.rng = rng if rng is not None else np.random.default_rng()
    self.beta = beta
    self.n_pts = n_pts
    self.fftw_planning = fftw_planning
    self.fftw_threads = fftw_threads

    # Computed values
    self.nyquist = int(0.5 * n_pts)
    self.nk = self.nyquist + 1
    wavenumber = np.fft.rfftfreq(n_pts, d=1 / n_pts)
    wavenumber[0] = 1  # Avoid /0; DC component is 0 in compute_noise()

    # Precompute scaling factor for white noise generation
    self._sqrt_n = np.sqrt(n_pts)

    # Precompute spectral coloring coefficients (k^(beta/2))
    # This avoids redundant power computation in compute_noise()
    self._coloring = wavenumber ** (0.5 * beta)

    # pyfftw arrays (real <-> complex rfft/irfft)
    self.x = pyfftw.empty_aligned(n_pts, np.float64)
    self.fx = pyfftw.empty_aligned(self.nk, np.complex128)
    self.fxn = pyfftw.empty_aligned(self.nk, np.complex128)
    self.noise = pyfftw.empty_aligned(n_pts, np.float64)

    # pyfftw functions
    self.fft = pyfftw.FFTW(
        self.x,
        self.fx,
        direction="FFTW_FORWARD",
        flags=(self.fftw_planning,),
        threads=self.fftw_threads,
    )

    self.ifft = pyfftw.FFTW(
        self.fxn,
        self.noise,
        direction="FFTW_BACKWARD",
        flags=(self.fftw_planning,),
        threads=self.fftw_threads,
    )

compute_noise

compute_noise() -> NDArray[np.float64]

Generate a realization of FBM noise.

Creates white noise, transforms to spectral space, applies the FBM spectral coloring (k^(beta/2)), and transforms back.

Returns:

Type Description
NDArray[float64]

Real-valued noise array with FBM spectral characteristics.

Note

The returned array is an internal buffer that will be overwritten on the next call to compute_noise(). If you need to preserve the values, make a copy: noise.copy().

Source code in pyburgers/utils/fbm.py
def compute_noise(self) -> NDArray[np.float64]:
    """Generate a realization of FBM noise.

    Creates white noise, transforms to spectral space, applies
    the FBM spectral coloring (k^(beta/2)), and transforms back.

    Returns:
        Real-valued noise array with FBM spectral characteristics.

    Note:
        The returned array is an internal buffer that will be overwritten
        on the next call to compute_noise(). If you need to preserve the
        values, make a copy: ``noise.copy()``.
    """
    # Generate white noise directly into the aligned buffer, then scale
    self.rng.standard_normal(out=self.x)
    self.x *= self._sqrt_n

    # Transform to spectral space
    self.fft()

    # Zero-out DC and Nyquist modes, apply precomputed spectral coloring
    self.fx[0] = 0
    self.fx[self.nyquist] = 0
    self.fxn[:] = self.fx * self._coloring

    # Transform back to physical space
    self.ifft()

    return self.noise

constants

Defines constants for PyBurgers.

This module provides a centralized location for all constants used throughout PyBurgers. Constants are grouped into logical namespaces using frozen dataclasses.

Module-level instances

spectral: Spectral algorithm constants. sgs: Subgrid-scale model constants.

spectral module-attribute

spectral = SpectralConstants()

sgs module-attribute

sgs = SGSConstants()

SpectralConstants dataclass

SpectralConstants(DEALIAS_SCALE: float = 3.0 / 2.0)

Spectral algorithm constants.

Attributes:

Name Type Description
DEALIAS_SCALE float

Scale factor for dealiasing using 3/2 padding rule

DEALIAS_SCALE class-attribute instance-attribute

DEALIAS_SCALE: float = 3.0 / 2.0

SGSConstants dataclass

SGSConstants(TEST_FILTER_RATIO: int = 2, SMAG_CONSTANT_CS: float = 0.16, DEARDORFF_CE: float = 0.7, DEARDORFF_C1: float = 0.1, WONGLILLY_EXPONENT: float = 4.0 / 3.0)

Subgrid-scale model constants.

Attributes:

Name Type Description
TEST_FILTER_RATIO int

Ratio for test filter width in dynamic models

SMAG_CONSTANT_CS float

Smagorinsky constant (Cs) for constant-coefficient model

DEARDORFF_CE float

Deardorff model constant for TKE dissipation

DEARDORFF_C1 float

Deardorff model constant for eddy viscosity

WONGLILLY_EXPONENT float

Exponent for Wong-Lilly dynamic procedure (4/3)

TEST_FILTER_RATIO class-attribute instance-attribute

TEST_FILTER_RATIO: int = 2

SMAG_CONSTANT_CS class-attribute instance-attribute

SMAG_CONSTANT_CS: float = 0.16

DEARDORFF_CE class-attribute instance-attribute

DEARDORFF_CE: float = 0.7

DEARDORFF_C1 class-attribute instance-attribute

DEARDORFF_C1: float = 0.1

WONGLILLY_EXPONENT class-attribute instance-attribute

WONGLILLY_EXPONENT: float = 4.0 / 3.0

logging_helper

Logging configuration for PyBurgers.

This module provides centralized logging setup and helper functions for consistent logging across all PyBurgers modules.

setup_logging

setup_logging(level: str | int = 'INFO', format_string: str | None = None, log_file: str | None = None, file_mode: str = 'w') -> None

Configure the root logger for PyBurgers.

This should be called once at application startup, typically in the main entry point (burgers.py).

Parameters:

Name Type Description Default
level str | int

Log level as string ("DEBUG", "INFO", etc.) or int.

'INFO'
format_string str | None

Optional custom format string. If None, uses DEBUG_FORMAT for DEBUG level, DEFAULT_FORMAT otherwise.

None
log_file str | None

Optional log file path for file logging.

None
file_mode str

File mode for log file handler (default: "w").

'w'
Source code in pyburgers/utils/logging_helper.py
def setup_logging(
    level: str | int = "INFO",
    format_string: str | None = None,
    log_file: str | None = None,
    file_mode: str = "w",
) -> None:
    """Configure the root logger for PyBurgers.

    This should be called once at application startup, typically in
    the main entry point (burgers.py).

    Args:
        level: Log level as string ("DEBUG", "INFO", etc.) or int.
        format_string: Optional custom format string. If None, uses
            DEBUG_FORMAT for DEBUG level, DEFAULT_FORMAT otherwise.
        log_file: Optional log file path for file logging.
        file_mode: File mode for log file handler (default: "w").
    """
    # Convert string level to int if needed
    if isinstance(level, str):
        level = getattr(logging, level.upper(), logging.INFO)

    # Configure root logger
    root_logger = logging.getLogger("PyBurgers")
    root_logger.setLevel(level)

    # Remove existing handlers to avoid duplicates
    root_logger.handlers.clear()

    # Create console handler
    handler = logging.StreamHandler(sys.stdout)
    handler.setLevel(level)
    handler.setFormatter(log_format)
    handler.addFilter(_SkipProgressFilter())
    root_logger.addHandler(handler)

    # Create progress handler (same format, overwrites current line)
    progress_handler = _ProgressHandler(sys.stdout)
    progress_handler.setLevel(level)
    progress_handler.setFormatter(log_format)
    progress_handler.addFilter(_ProgressOnlyFilter())
    root_logger.addHandler(progress_handler)

    if log_file:
        log_path = Path(log_file).expanduser()
        if log_path.parent != Path("."):
            log_path.parent.mkdir(parents=True, exist_ok=True)
        file_handler = logging.FileHandler(log_path, mode=file_mode, encoding="utf-8")
        file_handler.setLevel(level)
        file_handler.setFormatter(log_format)
        file_handler.addFilter(_SkipProgressFilter())
        root_logger.addHandler(file_handler)

    # Prevent propagation to root logger
    root_logger.propagate = False

get_logger

get_logger(name: str) -> logging.Logger

Get a logger for the specified module/class.

Creates a child logger under the PyBurgers namespace for consistent hierarchical logging.

Parameters:

Name Type Description Default
name str

Name for the logger, typically the class or module name. Examples: "DNS", "LES", "SGS", "Input", "Output"

required

Returns:

Type Description
Logger

Configured logger instance.

Example

logger = get_logger("DNS") logger.info("Starting simulation") [PyBurgers: DNS] Starting simulation

Source code in pyburgers/utils/logging_helper.py
def get_logger(name: str) -> logging.Logger:
    """Get a logger for the specified module/class.

    Creates a child logger under the PyBurgers namespace for consistent
    hierarchical logging.

    Args:
        name: Name for the logger, typically the class or module name.
            Examples: "DNS", "LES", "SGS", "Input", "Output"

    Returns:
        Configured logger instance.

    Example:
        >>> logger = get_logger("DNS")
        >>> logger.info("Starting simulation")
        [PyBurgers: DNS]     Starting simulation
    """
    full_name = f"PyBurgers.{name}"

    if full_name not in _loggers:
        _loggers[full_name] = logging.getLogger(full_name)

    return _loggers[full_name]

get_log_level

get_log_level(level_name: str) -> int

Convert a log level name to its integer value.

Parameters:

Name Type Description Default
level_name str

Case-insensitive level name ("debug", "INFO", etc.)

required

Returns:

Type Description
int

Integer log level value.

Raises:

Type Description
ValueError

If level_name is not a valid log level.

Source code in pyburgers/utils/logging_helper.py
def get_log_level(level_name: str) -> int:
    """Convert a log level name to its integer value.

    Args:
        level_name: Case-insensitive level name ("debug", "INFO", etc.)

    Returns:
        Integer log level value.

    Raises:
        ValueError: If level_name is not a valid log level.
    """
    level_name = level_name.upper()
    level = getattr(logging, level_name, None)

    if level is None:
        valid_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
        raise ValueError(
            f"Invalid log level: '{level_name}'. Valid options: {', '.join(valid_levels)}"
        )

    return level

Input/Output

Input

Input(namelist_path: str)

Orchestrates the loading and validation of all model inputs.

This class reads configuration from a JSON namelist file. All data is validated and organized into the appropriate dataclasses.

Attributes:

Name Type Description
time TimeConfig

Dataclass with time-related parameters (duration).

physics PhysicsConfig

Dataclass with physics parameters (noise, viscosity).

grid GridConfig

Dataclass with grid configuration (length, DNS, LES).

numerics NumericsConfig

Dataclass with numerical method selections (integration, cfl, max_step).

output OutputConfig

Dataclass with output file configuration.

logging LoggingConfig

Dataclass with logging settings.

fftw FFTWConfig

Dataclass with FFTW configuration.

Initialize the Input class and load all configuration.

Parameters:

Name Type Description Default
namelist_path str

The file path to the JSON namelist.

required

Raises:

Type Description
FileNotFoundError

If the namelist file does not exist.

JSONDecodeError

If the namelist JSON file is malformed.

NamelistError

If required configuration is missing or invalid.

Source code in pyburgers/utils/io/input.py
def __init__(self, namelist_path: str) -> None:
    """Initialize the Input class and load all configuration.

    Args:
        namelist_path: The file path to the JSON namelist.

    Raises:
        FileNotFoundError: If the namelist file does not exist.
        json.JSONDecodeError: If the namelist JSON file is malformed.
        NamelistError: If required configuration is missing or invalid.
    """
    # Set up basic logging before we can read the log level
    setup_logging(level="INFO")
    self.logger: logging.Logger = get_logger("Input")
    self.logger.info("Reading %s", namelist_path)

    # Validation fills in schema defaults, so all optional keys below
    # are guaranteed to exist. The only fields accessed without a
    # schema default are the required ones (time.duration, physics.viscosity).
    namelist_data = self._load_and_validate_namelist(namelist_path)

    # Extract and finalize logging config first so we can adjust log level
    logging_data = namelist_data["logging"]
    log_file = logging_data["file"] or None  # treat "" as None
    self.logging: LoggingConfig = LoggingConfig(
        level=logging_data["level"], file=log_file
    )
    setup_logging(level=logging_data["level"], log_file=log_file)

    # Time configuration
    time_data = namelist_data["time"]
    self.time: TimeConfig = TimeConfig(duration=float(time_data["duration"]))

    # Numerics configuration
    numerics_data = namelist_data["numerics"]
    self.numerics: NumericsConfig = NumericsConfig(
        temporal=int(numerics_data["temporal"]),
        spatial=int(numerics_data["spatial"]),
        cfl=float(numerics_data["cfl"]),
        max_step=float(numerics_data["max_step"]),
    )

    # AB2 has a limited stability region for hyperbolic terms; warn if CFL
    # is set higher than the recommended limit for that scheme.
    if self.numerics.temporal == 1 and self.numerics.cfl > 0.4:
        self.logger.warning(
            "CFL target %.2f exceeds the recommended limit of 0.4 for AB2 "
            "(temporal=1). Consider reducing cfl or switching to RK3 (temporal=3).",
            self.numerics.cfl,
        )

    # Grid configuration (DNS and LES)
    grid_data = namelist_data["grid"]
    self.grid: GridConfig = GridConfig(
        length=float(grid_data["length"]),
        dns=DNSConfig(points=int(grid_data["dns"]["points"])),
        les=LESConfig(points=int(grid_data["les"]["points"])),
    )

    dns_pts = self.grid.dns.points
    les_pts = self.grid.les.points
    if dns_pts % les_pts != 0:
        raise NamelistError(
            f"grid.dns.points ({dns_pts}) must be divisible by "
            f"grid.les.points ({les_pts})"
        )

    # Physics configuration
    physics_data = namelist_data["physics"]
    noise_data = physics_data["noise"]
    self.physics: PhysicsConfig = PhysicsConfig(
        noise=NoiseConfig(
            exponent=float(noise_data["exponent"]),
            amplitude=float(noise_data["amplitude"]),
            seed=int(noise_data["seed"]) if noise_data["seed"] is not None else None,
        ),
        viscosity=float(physics_data["viscosity"]),
        subgrid_model=int(physics_data["subgrid_model"]),
    )

    # Output configuration. interval_save and interval_print have no
    # schema default because the sensible default is tied to max_step.
    output_data = namelist_data["output"]
    default_interval = 100 * self.numerics.max_step
    self.output: OutputConfig = OutputConfig(
        interval_save=float(output_data.get("interval_save", default_interval)),
        interval_print=float(output_data.get("interval_print", default_interval)),
    )

    # FFTW configuration
    fftw_data = namelist_data["fftw"]
    self.fftw: FFTWConfig = FFTWConfig(
        planning=str(fftw_data["planning"]),
        threads=int(fftw_data["threads"]),
    )

    self._log_configuration()
    self.logger.info("--- namelist loaded successfully")

log_level property

log_level: str

Convenience accessor for log level.

fftw_planning property

fftw_planning: str

Convenience accessor for FFTW planning strategy.

fftw_threads property

fftw_threads: int

Convenience accessor for FFTW thread count.

cfl_target property

cfl_target: float

Target CFL number for adaptive time stepping.

max_step property

max_step: float

Maximum allowed time step.

domain_length property

domain_length: float

Convenience accessor for domain length.

viscosity property

viscosity: float

Convenience accessor for viscosity.

t_save property

t_save: float

Save interval in seconds.

t_print property

t_print: float

Print interval in seconds.

temporal property

temporal: int

Time integration scheme identifier.

get_dns_config

get_dns_config() -> dict[str, Any]

Get DNS-specific configuration as a dictionary.

Returns:

Type Description
dict[str, Any]

Dictionary with DNS configuration values.

Source code in pyburgers/utils/io/input.py
def get_dns_config(self) -> dict[str, Any]:
    """Get DNS-specific configuration as a dictionary.

    Returns:
        Dictionary with DNS configuration values.
    """
    return {
        "nx": self.grid.dns.points,
        "cfl": self.numerics.cfl,
        "max_step": self.numerics.max_step,
        "temporal": self.numerics.temporal,
        "viscosity": self.physics.viscosity,
        "noise_beta": self.physics.noise.exponent,
        "noise_amplitude": self.physics.noise.amplitude,
        "t_save": self.output.interval_save,
        "domain_length": self.grid.length,
    }

get_les_config

get_les_config() -> dict[str, Any]

Get LES-specific configuration as a dictionary.

Returns:

Type Description
dict[str, Any]

Dictionary with LES configuration values.

Source code in pyburgers/utils/io/input.py
def get_les_config(self) -> dict[str, Any]:
    """Get LES-specific configuration as a dictionary.

    Returns:
        Dictionary with LES configuration values.
    """
    return {
        "nx": self.grid.les.points,
        "sgs_model": self.physics.subgrid_model,
        "cfl": self.numerics.cfl,
        "max_step": self.numerics.max_step,
        "temporal": self.numerics.temporal,
        "viscosity": self.physics.viscosity,
        "noise_beta": self.physics.noise.exponent,
        "noise_amplitude": self.physics.noise.amplitude,
        "t_save": self.output.interval_save,
        "domain_length": self.grid.length,
    }

Output

Output(outfile: str, sync_interval: int = 100)

Manages the creation and writing of NetCDF output files.

This class handles all aspects of the output file, from its initial creation to writing data at each time step and final closing.

Attributes:

Name Type Description
outfile Dataset

A netCDF4.Dataset object representing the output file.

fields_time dict[str, Any]

A dictionary mapping time-varying field names to their NetCDF variable objects.

fields_static dict[str, Any]

A dictionary mapping static field names to their NetCDF variable objects.

attributes dict[str, dict[str, Any]]

A dictionary defining the metadata (dimensions, units, etc.) for each possible output variable.

Initialize the Output class and create the NetCDF file.

Parameters:

Name Type Description Default
outfile str

The path and name for the output NetCDF file.

required
sync_interval int

Number of saves between disk syncs. Higher values improve performance but risk data loss on crash. Defaults to 100.

100
Source code in pyburgers/utils/io/output.py
def __init__(self, outfile: str, sync_interval: int = 100) -> None:
    """Initialize the Output class and create the NetCDF file.

    Args:
        outfile: The path and name for the output NetCDF file.
        sync_interval: Number of saves between disk syncs. Higher values
            improve performance but risk data loss on crash. Defaults to 100.
    """
    self.logger: logging.Logger = get_logger("Output")
    self.logger.info("Saving output to %s", outfile)
    self.outfile: nc.Dataset = nc.Dataset(outfile, "w")
    self._sync_interval = sync_interval
    self._save_count = 0

    self.outfile.description = "PyBurgers output"
    self.outfile.source = "PyBurgers - 1D Stochastic Burgers Equation Solver"
    self.outfile.history = "Created " + time_module.ctime(time_module.time())

    self.fields_time: dict[str, Any] = {}
    self.fields_static: dict[str, Any] = {}
    self.attributes: dict[str, dict[str, Any]] = {
        "time": {"dimension": ("t",), "long_name": "time", "units": "s"},
        "x": {"dimension": ("x",), "long_name": "x-distance", "units": "m"},
        "u": {"dimension": ("t", "x"), "long_name": "u-component velocity", "units": "m s-1"},
        "tke": {
            "dimension": ("t",),
            "long_name": "turbulence kinetic energy",
            "units": "m2 s-2",
        },
        "tke_sgs": {
            "dimension": ("t",),
            "long_name": "subgrid turbulence kinetic energy",
            "units": "m2 s-2",
        },
        "tke_sgs_prod": {
            "dimension": ("t",),
            "long_name": "subgrid TKE production",
            "units": "m2 s-3",
        },
        "tke_sgs_diff": {
            "dimension": ("t",),
            "long_name": "subgrid TKE diffusion",
            "units": "m2 s-3",
        },
        "tke_sgs_diss": {
            "dimension": ("t",),
            "long_name": "subgrid TKE dissipation",
            "units": "m2 s-3",
        },
        "C_sgs": {"dimension": ("t",), "long_name": "subgrid model coefficient", "units": "--"},
        "diss_sgs": {
            "dimension": ("t",),
            "long_name": "subgrid dissipation",
            "units": "m2 s-3",
        },
        "diss_mol": {
            "dimension": ("t",),
            "long_name": "molecular dissipation",
            "units": "m2 s-3",
        },
        "ens_prod": {"dimension": ("t",), "long_name": "enstrophy production", "units": "s-3"},
        "ens_diss_sgs": {
            "dimension": ("t",),
            "long_name": "subgrid enstrophy dissipation",
            "units": "s-3",
        },
        "ens_diss_mol": {
            "dimension": ("t",),
            "long_name": "molecular enstrophy dissipation",
            "units": "s-3",
        },
    }

set_dims

set_dims(dims: dict[str, int]) -> None

Set the dimensions in the NetCDF output file.

Parameters:

Name Type Description Default
dims dict[str, int]

A dictionary mapping dimension names to their sizes. A size of 0 indicates an unlimited dimension.

required
Source code in pyburgers/utils/io/output.py
def set_dims(self, dims: dict[str, int]) -> None:
    """Set the dimensions in the NetCDF output file.

    Args:
        dims: A dictionary mapping dimension names to their sizes. A size
            of 0 indicates an unlimited dimension.
    """
    for dim, size in dims.items():
        if size == 0:
            self.outfile.createDimension(dim)
        else:
            self.outfile.createDimension(dim, size)

set_fields

set_fields(fields: dict[str, Any]) -> None

Create the variables (fields) in the NetCDF output file.

Parameters:

Name Type Description Default
fields dict[str, Any]

A dictionary of fields to be created in the output file.

required
Source code in pyburgers/utils/io/output.py
def set_fields(self, fields: dict[str, Any]) -> None:
    """Create the variables (fields) in the NetCDF output file.

    Args:
        fields: A dictionary of fields to be created in the output file.
    """
    # Add time manually
    dims = self.attributes["time"]["dimension"]
    name = self.attributes["time"]["long_name"]
    units = self.attributes["time"]["units"]
    ncvar = self.outfile.createVariable("time", "f8", dims)
    ncvar.units = units
    ncvar.long_name = name
    self.fields_time["time"] = ncvar

    # Iterate through keys in dictionary
    for field in fields:
        if field not in self.attributes:
            self.logger.warning("Unknown field %s, skipping", field)
            continue
        dims = self.attributes[field]["dimension"]
        units = self.attributes[field]["units"]
        name = self.attributes[field]["long_name"]
        ncvar = self.outfile.createVariable(field, "f8", dims)
        ncvar.units = units
        ncvar.long_name = name
        if "t" in dims:
            self.fields_time[field] = ncvar
        else:
            self.fields_static[field] = ncvar

save

save(fields: dict[str, Any], tidx: int, time: float, initial: bool = False) -> None

Save a snapshot of the simulation state to the output file.

Parameters:

Name Type Description Default
fields dict[str, Any]

A dictionary of data fields to save.

required
tidx int

The time index for the current snapshot.

required
time float

The simulation time in seconds.

required
initial bool

A boolean flag indicating if this is the initial save, in which case only static fields are written. Defaults to False.

False
Source code in pyburgers/utils/io/output.py
def save(self, fields: dict[str, Any], tidx: int, time: float, initial: bool = False) -> None:
    """Save a snapshot of the simulation state to the output file.

    Args:
        fields: A dictionary of data fields to save.
        tidx: The time index for the current snapshot.
        time: The simulation time in seconds.
        initial: A boolean flag indicating if this is the initial save,
            in which case only static fields are written. Defaults to False.
    """
    # Save static fields only on initial save
    if initial:
        for field, static_var in self.fields_static.items():
            if field in fields:
                static_var[:] = np.asarray(fields[field])

    # Save time-varying fields
    for field, field_var in self.fields_time.items():
        dim = self.attributes[field]["dimension"]
        if len(dim) == 1:
            if field == "time":
                field_var[tidx] = time
            elif field in fields:
                field_var[tidx] = fields[field]
        else:
            if field in fields:
                field_var[tidx, :] = np.real(np.asarray(fields[field]))

    self._save_count += 1
    if self._sync_interval > 0 and self._save_count % self._sync_interval == 0:
        self.outfile.sync()

close

close() -> None

Close the NetCDF output file.

Performs a final sync to ensure all buffered data is written before closing the file.

Source code in pyburgers/utils/io/output.py
def close(self) -> None:
    """Close the NetCDF output file.

    Performs a final sync to ensure all buffered data is written before
    closing the file.
    """
    self.outfile.sync()
    self.outfile.close()
    self.logger.info("Output file closed")

Physics

SGS

SGS(input_obj: Input, spectral: SpectralWorkspace)

Base class for subgrid-scale models.

Provides the interface and factory method for SGS models used in Large-Eddy Simulation. The base class returns zero SGS stress, equivalent to no subgrid model.

Attributes:

Name Type Description
input

Input configuration object.

spectral

SpectralWorkspace with shared spectral utilities.

nx

Number of LES grid points.

dx

Grid spacing.

result dict[str, Any]

Dictionary containing SGS stress (tau) and coefficient.

Initialize the SGS model.

Parameters:

Name Type Description Default
input_obj Input

Input configuration object containing simulation parameters.

required
spectral SpectralWorkspace

SpectralWorkspace containing shared spectral utilities. The base SGS class does not use this, but accepts it for consistency with subclasses.

required
Source code in pyburgers/physics/sgs/sgs.py
def __init__(self, input_obj: Input, spectral: SpectralWorkspace) -> None:
    """Initialize the SGS model.

    Args:
        input_obj: Input configuration object containing simulation
            parameters.
        spectral: SpectralWorkspace containing shared spectral utilities.
            The base SGS class does not use this, but accepts it for
            consistency with subclasses.
    """
    self.input = input_obj
    self.spectral = spectral
    self.nx = input_obj.grid.les.points
    self.dx = input_obj.domain_length / self.nx
    self.fftw_planning = input_obj.fftw_planning
    self.fftw_threads = input_obj.fftw_threads

    # SGS terms dictionary
    self.result: dict[str, Any] = {"tau": np.zeros(self.nx), "coeff": 0}

get_model staticmethod

get_model(model: int, input_obj: Input, spectral: SpectralWorkspace) -> SGS

Factory method to create the appropriate SGS model.

Parameters:

Name Type Description Default
model int

SGS model type identifier. 1 = Constant-coefficient Smagorinsky 2 = Dynamic Smagorinsky 3 = Dynamic Wong-Lilly 4 = Deardorff 1.5-order TKE

required
input_obj Input

Input configuration object.

required
spectral SpectralWorkspace

SpectralWorkspace containing shared spectral utilities (Derivatives, Dealias, Filter). REQUIRED for all SGS models.

required

Returns:

Type Description
SGS

Instance of the requested SGS model subclass.

Raises:

Type Description
ValueError

If model is not a valid option (1-4).

Source code in pyburgers/physics/sgs/sgs.py
@staticmethod
def get_model(model: int, input_obj: Input, spectral: SpectralWorkspace) -> SGS:
    """Factory method to create the appropriate SGS model.

    Args:
        model: SGS model type identifier.
            1 = Constant-coefficient Smagorinsky
            2 = Dynamic Smagorinsky
            3 = Dynamic Wong-Lilly
            4 = Deardorff 1.5-order TKE
        input_obj: Input configuration object.
        spectral: SpectralWorkspace containing shared spectral utilities
            (Derivatives, Dealias, Filter). REQUIRED for all SGS models.

    Returns:
        Instance of the requested SGS model subclass.

    Raises:
        ValueError: If model is not a valid option (1-4).
    """
    if model == 1:
        from .sgs_smagcon import SmagConstant

        return SmagConstant(input_obj, spectral)
    if model == 2:
        from .sgs_smagdyn import SmagDynamic

        return SmagDynamic(input_obj, spectral)
    if model == 3:
        from .sgs_wonglilly import WongLilly

        return WongLilly(input_obj, spectral)
    if model == 4:
        from .sgs_deardorff import Deardorff

        return Deardorff(input_obj, spectral)
    raise ValueError(f"--- unknown SGS model ID: {model}. Valid options: 1-4.")

compute

compute(u: ndarray, dudx: ndarray, tke_sgs: ndarray | float, dt: float) -> dict[str, Any]

Compute the SGS stress tensor.

Parameters:

Name Type Description Default
u ndarray

Velocity field array.

required
dudx ndarray

Velocity gradient (du/dx) array.

required
tke_sgs ndarray | float

Subgrid TKE (used by Deardorff model).

required
dt float

Current time step size.

required

Returns:

Type Description
dict[str, Any]

Dictionary containing: - 'tau': SGS stress array - 'coeff': Model coefficient - 'tke_sgs': Updated subgrid TKE (Deardorff only)

Source code in pyburgers/physics/sgs/sgs.py
def compute(
    self, u: np.ndarray, dudx: np.ndarray, tke_sgs: np.ndarray | float, dt: float
) -> dict[str, Any]:
    """Compute the SGS stress tensor.

    Args:
        u: Velocity field array.
        dudx: Velocity gradient (du/dx) array.
        tke_sgs: Subgrid TKE (used by Deardorff model).
        dt: Current time step size.

    Returns:
        Dictionary containing:
            - 'tau': SGS stress array
            - 'coeff': Model coefficient
            - 'tke_sgs': Updated subgrid TKE (Deardorff only)
    """
    return self.result

SmagConstant

SmagConstant(input_obj: Input, spectral: SpectralWorkspace)

Bases: SGS

Constant-coefficient Smagorinsky subgrid-scale model.

The classic Smagorinsky model computes SGS stress as

tau = -2 * Cs^2 * dx^2 * |S| * S

where Cs is a constant (typically 0.16) and S is the strain rate.

Uses the shared spectral workspace for dealiasing operations.

Initialize the constant Smagorinsky model.

Parameters:

Name Type Description Default
input_obj Input

Input configuration object.

required
spectral SpectralWorkspace

SpectralWorkspace with shared Dealias utility.

required
Source code in pyburgers/physics/sgs/sgs_smagcon.py
def __init__(self, input_obj: Input, spectral: SpectralWorkspace) -> None:
    """Initialize the constant Smagorinsky model.

    Args:
        input_obj: Input configuration object.
        spectral: SpectralWorkspace with shared Dealias utility.
    """
    super().__init__(input_obj, spectral)
    self.logger: logging.Logger = get_logger("SGS")
    self.logger.info("--- using the Smagorinsky model")

compute

compute(u: ndarray, dudx: ndarray, tke_sgs: ndarray | float, dt: float) -> dict[str, Any]

Compute the Smagorinsky SGS stress.

Parameters:

Name Type Description Default
u ndarray

Velocity field array (unused).

required
dudx ndarray

Velocity gradient array.

required
tke_sgs ndarray | float

Subgrid TKE (unused in this model).

required
dt float

Current time step size (unused in this model).

required

Returns:

Type Description
dict[str, Any]

Dictionary with 'tau' (SGS stress) and 'coeff' (Cs).

Source code in pyburgers/physics/sgs/sgs_smagcon.py
def compute(
    self, u: np.ndarray, dudx: np.ndarray, tke_sgs: np.ndarray | float, dt: float
) -> dict[str, Any]:
    """Compute the Smagorinsky SGS stress.

    Args:
        u: Velocity field array (unused).
        dudx: Velocity gradient array.
        tke_sgs: Subgrid TKE (unused in this model).
        dt: Current time step size (unused in this model).

    Returns:
        Dictionary with 'tau' (SGS stress) and 'coeff' (Cs).
    """
    # Model constants
    cs = c.sgs.SMAG_CONSTANT_CS
    cs2 = cs**2

    dudx2 = self.spectral.dealias.compute(dudx)

    np.multiply(-2.0 * cs2 * self.dx**2, dudx2, out=self.result["tau"])
    self.result["coeff"] = cs

    return self.result

SmagDynamic

SmagDynamic(input_obj: Input, spectral: SpectralWorkspace)

Bases: SGS

Dynamic Smagorinsky subgrid-scale model.

Uses the Germano identity to dynamically compute the Smagorinsky coefficient from the resolved velocity field. This removes the need for tuning Cs and allows it to adapt to local flow conditions.

Uses the shared spectral workspace for filtering and dealiasing operations.

Initialize the dynamic Smagorinsky model.

Parameters:

Name Type Description Default
input_obj Input

Input configuration object.

required
spectral SpectralWorkspace

SpectralWorkspace with shared Dealias and Filter utilities.

required
Source code in pyburgers/physics/sgs/sgs_smagdyn.py
def __init__(self, input_obj: Input, spectral: SpectralWorkspace) -> None:
    """Initialize the dynamic Smagorinsky model.

    Args:
        input_obj: Input configuration object.
        spectral: SpectralWorkspace with shared Dealias and Filter utilities.
    """
    super().__init__(input_obj, spectral)
    self.logger: logging.Logger = get_logger("SGS")
    self.logger.info("--- using the Dynamic Smagorinsky model")

    # Pre-allocate scratch arrays to avoid temporaries in the hot loop
    nx = self.nx
    self._scratch_a = np.zeros(nx)
    self._scratch_b = np.zeros(nx)
    self._scratch_c = np.zeros(nx)
    self._filt_a = np.zeros(nx)  # filter.cutoff output buffer
    self._filt_b = np.zeros(nx)  # filter.cutoff output buffer

compute

compute(u: ndarray, dudx: ndarray, tke_sgs: ndarray | float, dt: float) -> dict[str, Any]

Compute the dynamic Smagorinsky SGS stress.

Uses test filtering to compute the Leonard stress L and model tensor M, then determines Cs^2 from their contraction.

Parameters:

Name Type Description Default
u ndarray

Velocity field array.

required
dudx ndarray

Velocity gradient array.

required
tke_sgs ndarray | float

Subgrid TKE (unused in this model).

required
dt float

Current time step size (unused in this model).

required

Returns:

Type Description
dict[str, Any]

Dictionary with 'tau' (SGS stress) and 'coeff' (Cs).

Source code in pyburgers/physics/sgs/sgs_smagdyn.py
def compute(
    self, u: np.ndarray, dudx: np.ndarray, tke_sgs: np.ndarray | float, dt: float
) -> dict[str, Any]:
    """Compute the dynamic Smagorinsky SGS stress.

    Uses test filtering to compute the Leonard stress L and
    model tensor M, then determines Cs^2 from their contraction.

    Args:
        u: Velocity field array.
        dudx: Velocity gradient array.
        tke_sgs: Subgrid TKE (unused in this model).
        dt: Current time step size (unused in this model).

    Returns:
        Dictionary with 'tau' (SGS stress) and 'coeff' (Cs).
    """
    # Model constants
    ratio = c.sgs.TEST_FILTER_RATIO
    dx2 = self.dx**2
    ratio2 = ratio**2

    # Leonard stress L11 = <uu> - <u><u>
    np.square(u, out=self._scratch_a)                                    # u^2
    self.spectral.filter.cutoff(u, ratio, out=self._filt_a)              # uf
    self.spectral.filter.cutoff(self._scratch_a, ratio, out=self._filt_b)  # uuf
    np.square(self._filt_a, out=self._scratch_a)                         # uf^2
    np.subtract(self._filt_b, self._scratch_a, out=self._scratch_a)      # L11

    # Model tensor M11 (_filt_a/_filt_b now free for reuse)
    self.spectral.filter.cutoff(dudx, ratio, out=self._filt_a)           # dudxf
    np.abs(dudx, out=self._scratch_b)
    np.multiply(self._scratch_b, dudx, out=self._scratch_b)             # |dudx|*dudx
    self.spectral.filter.cutoff(self._scratch_b, ratio, out=self._filt_b)  # Tf
    np.abs(self._filt_a, out=self._scratch_b)
    np.multiply(self._scratch_b, self._filt_a, out=self._scratch_b)     # |dudxf|*dudxf
    np.multiply(ratio2, self._scratch_b, out=self._scratch_b)
    np.subtract(self._scratch_b, self._filt_b, out=self._scratch_b)
    np.multiply(dx2, self._scratch_b, out=self._scratch_b)              # M11

    # Dealiased strain rate
    dudx2 = self.spectral.dealias.compute(dudx)

    # Dynamic Smagorinsky coefficient
    np.multiply(self._scratch_b, self._scratch_b, out=self._scratch_c)
    M11_sq_mean = np.mean(self._scratch_c)
    if M11_sq_mean < 1e-30:
        cs2 = 0.0
    else:
        np.multiply(self._scratch_a, self._scratch_b, out=self._scratch_c)
        cs2 = -0.5 * np.mean(self._scratch_c) / M11_sq_mean
        if cs2 < 0:
            cs2 = 0.0

    np.multiply(-2 * cs2 * dx2, dudx2, out=self.result["tau"])
    self.result["coeff"] = np.sqrt(cs2)

    return self.result

WongLilly

WongLilly(input_obj: Input, spectral: SpectralWorkspace)

Bases: SGS

Dynamic Wong-Lilly subgrid-scale model.

A scale-similarity based SGS model that uses a different scaling for the SGS stress compared to the Smagorinsky model. The stress scales as dx^(4/3) rather than dx^2.

Uses the shared spectral workspace for filtering operations.

Initialize the Wong-Lilly model.

Parameters:

Name Type Description Default
input_obj Input

Input configuration object.

required
spectral SpectralWorkspace

SpectralWorkspace with shared Filter utility.

required
Source code in pyburgers/physics/sgs/sgs_wonglilly.py
def __init__(self, input_obj: Input, spectral: SpectralWorkspace) -> None:
    """Initialize the Wong-Lilly model.

    Args:
        input_obj: Input configuration object.
        spectral: SpectralWorkspace with shared Filter utility.
    """
    super().__init__(input_obj, spectral)
    self.logger: logging.Logger = get_logger("SGS")
    self.logger.info("--- using the Wong-Lilly model")

    # Pre-allocate scratch arrays to avoid temporaries in the hot loop
    nx = self.nx
    self._scratch_a = np.zeros(nx)
    self._scratch_b = np.zeros(nx)
    self._scratch_c = np.zeros(nx)
    self._filt_a = np.zeros(nx)  # filter.cutoff output buffer
    self._filt_b = np.zeros(nx)  # filter.cutoff output buffer

compute

compute(u: ndarray, dudx: ndarray, tke_sgs: ndarray | float, dt: float) -> dict[str, Any]

Compute the Wong-Lilly SGS stress.

Parameters:

Name Type Description Default
u ndarray

Velocity field array.

required
dudx ndarray

Velocity gradient array.

required
tke_sgs ndarray | float

Subgrid TKE (unused in this model).

required
dt float

Current time step size (unused in this model).

required

Returns:

Type Description
dict[str, Any]

Dictionary with 'tau' (SGS stress) and 'coeff' (C_WL).

Source code in pyburgers/physics/sgs/sgs_wonglilly.py
def compute(
    self, u: np.ndarray, dudx: np.ndarray, tke_sgs: np.ndarray | float, dt: float
) -> dict[str, Any]:
    """Compute the Wong-Lilly SGS stress.

    Args:
        u: Velocity field array.
        dudx: Velocity gradient array.
        tke_sgs: Subgrid TKE (unused in this model).
        dt: Current time step size (unused in this model).

    Returns:
        Dictionary with 'tau' (SGS stress) and 'coeff' (C_WL).
    """

    # Model constants
    ratio = c.sgs.TEST_FILTER_RATIO
    exponent = c.sgs.WONGLILLY_EXPONENT
    dx_exp = self.dx**exponent
    ratio_pow = ratio**exponent

    # Leonard stress L11 = <uu> - <u><u>
    np.square(u, out=self._scratch_a)                                    # u^2
    self.spectral.filter.cutoff(u, ratio, out=self._filt_a)              # uf
    self.spectral.filter.cutoff(self._scratch_a, ratio, out=self._filt_b)  # uuf
    np.square(self._filt_a, out=self._scratch_a)                         # uf^2
    np.subtract(self._filt_b, self._scratch_a, out=self._scratch_a)      # L11

    # Model tensor M11 (Wong-Lilly scaling; _filt_a now free for reuse)
    self.spectral.filter.cutoff(dudx, ratio, out=self._filt_a)           # dudxf
    np.multiply(dx_exp * (1 - ratio_pow), self._filt_a, out=self._scratch_b)  # M11

    # Wong-Lilly coefficient
    np.square(self._scratch_b, out=self._scratch_c)
    M11_sq_mean = np.mean(self._scratch_c)
    if M11_sq_mean < 1e-30:
        cwl = 0.0
    else:
        np.multiply(self._scratch_a, self._scratch_b, out=self._scratch_c)
        cwl = 0.5 * np.mean(self._scratch_c) / M11_sq_mean
        if cwl < 0:
            cwl = 0.0

    np.multiply(-2 * cwl * dx_exp, dudx, out=self.result["tau"])
    self.result["coeff"] = cwl

    return self.result

Deardorff

Deardorff(input_obj: Input, spectral: SpectralWorkspace)

Bases: SGS

Deardorff 1.5-order TKE subgrid-scale model.

A prognostic SGS model that solves a transport equation for subgrid turbulent kinetic energy (TKE). The eddy viscosity is computed from the subgrid TKE as: nu_t = c1 * dx * sqrt(tke_sgs).

Uses the shared spectral workspace for dealiasing and derivative operations.

Initialize the Deardorff TKE model.

Parameters:

Name Type Description Default
input_obj Input

Input configuration object.

required
spectral SpectralWorkspace

SpectralWorkspace with shared Dealias and Derivatives utilities.

required
Source code in pyburgers/physics/sgs/sgs_deardorff.py
def __init__(self, input_obj: Input, spectral: SpectralWorkspace) -> None:
    """Initialize the Deardorff TKE model.

    Args:
        input_obj: Input configuration object.
        spectral: SpectralWorkspace with shared Dealias and Derivatives utilities.
    """
    super().__init__(input_obj, spectral)
    self.logger: logging.Logger = get_logger("SGS")
    self.logger.info("--- using the Deardorff TKE model")

    # Precompute reciprocal for hot-loop multiplication instead of division
    self._inv_dx = 1.0 / self.dx

    # Pre-allocate scratch arrays to avoid temporaries in the hot loop
    nx = self.nx
    self._dudx_snap = np.zeros(nx)
    self._dkdx = np.zeros(nx)
    self._dkudx = np.zeros(nx)
    self._tke_safe = np.zeros(nx)
    self._Vt = np.zeros(nx)
    self._scratch_a = np.zeros(nx)
    self._scratch_b = np.zeros(nx)
    self._tke_tendency = np.zeros(nx)

compute

compute(u: ndarray, dudx: ndarray, tke_sgs: ndarray | float, dt: float) -> dict[str, Any]

Compute the Deardorff SGS stress and update subgrid TKE.

Solves the prognostic TKE equation and computes the SGS stress from the updated subgrid TKE.

Parameters:

Name Type Description Default
u ndarray

Velocity field array.

required
dudx ndarray

Velocity gradient array.

required
tke_sgs ndarray | float

Current subgrid TKE array.

required
dt float

Current time step size for TKE tendency integration.

required

Returns:

Type Description
dict[str, Any]

Dictionary with 'tau' (SGS stress), 'coeff' (c1),

dict[str, Any]

and 'tke_sgs' (updated subgrid TKE).

Source code in pyburgers/physics/sgs/sgs_deardorff.py
def compute(
    self, u: np.ndarray, dudx: np.ndarray, tke_sgs: np.ndarray | float, dt: float
) -> dict[str, Any]:
    """Compute the Deardorff SGS stress and update subgrid TKE.

    Solves the prognostic TKE equation and computes the SGS stress
    from the updated subgrid TKE.

    Args:
        u: Velocity field array.
        dudx: Velocity gradient array.
        tke_sgs: Current subgrid TKE array.
        dt: Current time step size for TKE tendency integration.

    Returns:
        Dictionary with 'tau' (SGS stress), 'coeff' (c1),
        and 'tke_sgs' (updated subgrid TKE).
    """

    # Model constants
    ce = c.sgs.DEARDORFF_CE  # Dissipation coefficient
    c1 = c.sgs.DEARDORFF_C1  # Eddy viscosity coefficient
    dx = self.dx

    # Snapshot dudx — the caller's array may alias the same
    # internal buffer that derivatives.compute() overwrites.
    self._dudx_snap[:] = dudx

    # Compute TKE gradients (copy each result before the next
    # compute() call overwrites the shared _out_1 buffer)
    derivs_k = self.spectral.derivatives.compute(tke_sgs, [1])
    self._dkdx[:] = derivs_k["1"]

    np.multiply(tke_sgs, u, out=self._scratch_a)
    derivs_ku = self.spectral.derivatives.compute(self._scratch_a, [1])
    self._dkudx[:] = derivs_ku["1"]

    # Eddy viscosity and SGS stress
    np.maximum(tke_sgs, 0.0, out=self._tke_safe)
    np.sqrt(self._tke_safe, out=self._Vt)
    np.multiply(c1 * dx, self._Vt, out=self._Vt)      # Vt = c1*dx*sqrt(tke_safe)
    np.multiply(-2.0, self._Vt, out=self._scratch_a)
    np.multiply(self._scratch_a, self._dudx_snap, out=self.result["tau"])  # tau

    # TKE diffusion term: d/dx(2*Vt*dkdx)
    np.multiply(self._Vt, self._dkdx, out=self._scratch_a)
    np.multiply(2.0, self._scratch_a, out=self._scratch_a)
    derivs_zz = self.spectral.derivatives.compute(self._scratch_a, [1])
    # dzzdx lives in the shared derivative buffer — used directly below

    # TKE tendency: -dkudx + production + diffusion - dissipation
    # Production: 2*Vt*dudx^2
    np.square(self._dudx_snap, out=self._scratch_a)
    np.multiply(self._Vt, self._scratch_a, out=self._scratch_a)
    np.multiply(2.0, self._scratch_a, out=self._scratch_a)  # prod
    prod_mean = float(np.mean(self._scratch_a))

    # Dissipation: -ce * tke_safe^1.5 / dx
    np.power(self._tke_safe, 1.5, out=self._scratch_b)
    np.multiply(-ce * self._inv_dx, self._scratch_b, out=self._scratch_b)  # diss
    diss_mean = float(np.mean(self._scratch_b))

    # Assemble tendency: -dkudx + prod + diff + diss
    diff = derivs_zz["1"]
    diff_mean = float(np.mean(diff))
    np.negative(self._dkudx, out=self._tke_tendency)
    np.add(self._tke_tendency, self._scratch_a, out=self._tke_tendency)
    np.add(self._tke_tendency, diff, out=self._tke_tendency)
    np.add(self._tke_tendency, self._scratch_b, out=self._tke_tendency)

    self.result["coeff"] = c1
    self.result["tke_tendency"] = self._tke_tendency
    self.result["tke_prod"] = prod_mean
    self.result["tke_diff"] = diff_mean
    self.result["tke_diss"] = diss_mean

    return self.result