CXR-Findings-AI / utils /layer_mask.py
manu02's picture
Update utils/layer_mask.py
9ee9ac7 verified
import torch
import torch.nn.functional as F
import math
import numpy as np
import matplotlib.pyplot as plt
@torch.no_grad()
def gaussian_layer_stack_pipeline(
x: torch.Tensor,
n_layers: int,
base_ksize: int = 3,
ksize_growth: int = 2,
sigma: float | None = None,
eps: float = 1e-8,
):
"""
All-in-one GPU batch pipeline:
1) Per-sample min-max normalize to [0,1]
2) Resize to (32,32)
3) Apply L Gaussian blurs with increasing kernel size in a single
horizontal conv + single vertical conv using depthwise groups
(via a shared max kernel padded with zeros)
4) Renormalize each layer to [0,1]
5) Return stacked (B,L,32,32), flat (B,L,1024), tiled (B,L,1024,1024 view)
Args:
x: (B,H,W) or (B,1,H,W) tensor (any device/dtype)
n_layers: number of layers
base_ksize: starting odd kernel size (e.g., 3)
ksize_growth: increment per layer (e.g., 2) -> ensures odd sizes
sigma: if None, uses (ksize-1)/6 per layer; else fixed sigma for all
eps: small number for safe division
Returns:
stacked: (B, n_layers, 32, 32) float on x.device
flat: (B, n_layers, 1024)
tiled: (B, n_layers, 1024, 1024) (expand view; memory-cheap)
"""
assert n_layers >= 1, "n_layers must be >= 1"
# ---- Ensure 4D, 1 channel; cast to float (stay on same device) ----
if x.ndim == 3:
x = x.unsqueeze(1) # (B,1,H,W)
elif x.ndim != 4 or x.shape[1] not in (1,):
raise ValueError(f"Expected (B,H,W) or (B,1,H,W); got {tuple(x.shape)}")
x = x.float()
B, _, H, W = x.shape
# ---- Per-sample min-max normalize to [0,1] ----
xmin = x.amin(dim=(2, 3), keepdim=True)
xmax = x.amax(dim=(2, 3), keepdim=True)
denom = (xmax - xmin).clamp_min(eps)
x = (x - xmin) / denom # (B,1,H,W) in [0,1]
# ---- Resize to 32x32 on GPU ----
x = F.interpolate(x, size=(32, 32), mode="bilinear", align_corners=False) # (B,1,32,32)
# ---- Prepare per-layer kernel sizes (odd) ----
ksizes = []
for i in range(n_layers, 0, -1): # to keep your original ordering: L...1
k = base_ksize + i * ksize_growth
k = int(k)
if k % 2 == 0:
k += 1
k = max(k, 1)
ksizes.append(k)
Kmax = max(ksizes)
pad = Kmax // 2
# ---- Build per-layer 1D Gaussian vectors and embed into shared Kmax kernel ----
# We create horizontal weights of shape (L,1,1,Kmax) and vertical (L,1,Kmax,1)
device, dtype = x.device, x.dtype
weight_h = torch.zeros((n_layers, 1, 1, Kmax), device=device, dtype=dtype)
weight_v = torch.zeros((n_layers, 1, Kmax, 1), device=device, dtype=dtype)
for idx, k in enumerate(ksizes):
# choose sigma
sig = sigma if (sigma is not None and sigma > 0) else (k - 1) / 6.0
r = k // 2
xp = torch.arange(-r, r + 1, device=device, dtype=dtype)
g = torch.exp(-(xp * xp) / (2.0 * sig * sig))
g = g / g.sum() # (k,)
# center g into Kmax with zeros around
start = (Kmax - k) // 2
end = start + k
# horizontal row
weight_h[idx, 0, 0, start:end] = g # (1 x Kmax)
# vertical column
weight_v[idx, 0, start:end, 0] = g # (Kmax x 1)
# ---- Duplicate input across L channels (depthwise groups) ----
xL = x.expand(B, n_layers, 32, 32).contiguous() # (B,L,32,32)
# ---- Separable Gaussian blur with a single pass per axis (groups=L) ----
# Horizontal
xh = F.pad(xL, (pad, pad, 0, 0), mode="reflect")
xh = F.conv2d(xh, weight=weight_h, bias=None, stride=1, padding=0, groups=n_layers) # (B,L,32,32)
# Vertical
xv = F.pad(xh, (0, 0, pad, pad), mode="reflect")
yL = F.conv2d(xv, weight=weight_v, bias=None, stride=1, padding=0, groups=n_layers) # (B,L,32,32)
# ---- Renormalize each layer to [0,1] (per-sample, per-layer) ----
y_min = yL.amin(dim=(2, 3), keepdim=True)
y_max = yL.amax(dim=(2, 3), keepdim=True)
y_den = (y_max - y_min).clamp_min(eps)
stacked = (yL - y_min) / y_den # (B,L,32,32) in [0,1]
# ---- Flatten + tile (expand view; caution w/ later materialization) ----
flat = stacked.reshape(B, n_layers, 32 * 32) # (B,L,1024)
tiled = flat.unsqueeze(-2).expand(-1, -1, 2 * 32 * 32, -1) # (B,L,1024,1024) view
return stacked, flat, tiled
def plot_layers_any(
x,
*,
max_batches=None,
vlim=(0, 1),
one_indexed: bool = False,
max_cols: int = 6,
):
"""
Plot layers for each batch sample in separate figures.
Accepts:
- stacked: (B, L, H, W)
- flat: (B, L, HW)
- tiled: (B, L, HW, HW)
Behavior:
- Creates one figure PER BATCH (up to `max_batches`).
- At most `max_cols` layers per row (default 6).
- Column headers: 'Layer {i}' descending from n-1 -> 0 (or n -> 1 if one_indexed=True).
- Figure title per batch: 'Masks for input {i} out of {B}'.
Returns:
A list of (fig, axes) tuples, one per plotted batch.
"""
# ---- Normalize input to torch ----
if isinstance(x, np.ndarray):
x = torch.from_numpy(x)
if not isinstance(x, torch.Tensor):
raise TypeError(f"Expected torch.Tensor or np.ndarray, got {type(x)}")
if x.ndim not in (3, 4):
raise ValueError(f"Expected ndim 3 or 4, got shape {tuple(x.shape)}")
# ---- Convert to (B, L, H, W) 'stacked' ----
if x.ndim == 4:
B, L, A, B_ = x.shape
if A == B_:
# Could be stacked (H==W) or tiled (HW x HW). Heuristic: if A is a perfect square
# and reasonably large (e.g., 1024), treat as tiled and collapse to flat.
s = int(math.isqrt(A))
if s * s == A and A >= 64:
flat = x[..., 0, :].detach() # (B, L, HW)
H = W = s
stacked = flat.reshape(B, L, H, W)
else:
stacked = x.detach()
else:
stacked = x.detach()
else:
# x.ndim == 3 -> (B, L, HW)
B, L, HW = x.shape
s = int(math.isqrt(HW))
if s * s != HW:
if HW != 32 * 32:
raise ValueError(
f"Cannot infer square image size from HW={HW}. "
f"Provide stacked (B,L,H,W) or flat with square HW."
)
s = 32
H = W = s
stacked = x.detach().reshape(B, L, H, W)
# Ensure float & CPU for plotting
stacked = stacked.to(torch.float32).cpu().numpy()
# ---- Batch selection ----
B, L, H, W = stacked.shape
plot_B = B if max_batches is None else max(1, min(B, int(max_batches)))
# ---- Layout params ----
cols = max(1, int(max_cols))
rows_needed = lambda L: (L + cols - 1) // cols
figs = []
for b in range(plot_B):
# number of rows for this batch
r = rows_needed(L)
fig, axes = plt.subplots(r, cols, figsize=(cols * 3, r * 3), squeeze=False)
fig.suptitle(f"Masks for input {b} out of {B}", fontsize=12, y=1.02)
for l in range(L):
rr = l // cols
cc = l % cols
ax = axes[rr, cc]
if vlim is None:
ax.imshow(stacked[b, l], cmap="gray")
else:
ax.imshow(stacked[b, l], cmap="gray", vmin=vlim[0], vmax=vlim[1])
ax.axis("off")
# Set column titles only on the first row of the grid
label_num = (l + 1) if one_indexed else l
ax.set_title(f"Layer {label_num}", fontsize=10)
# Hide any unused axes (when L is not a multiple of cols)
total_slots = r * cols
for empty_idx in range(L, total_slots):
rr = empty_idx // cols
cc = empty_idx % cols
axes[rr, cc].axis("off")
plt.tight_layout()
figs.append((fig, axes))
return figs