manu02 commited on
Commit
9ee9ac7
·
verified ·
1 Parent(s): 97397b1

Update utils/layer_mask.py

Browse files
Files changed (1) hide show
  1. utils/layer_mask.py +224 -224
utils/layer_mask.py CHANGED
@@ -1,224 +1,224 @@
1
- import torch
2
- import torch.nn.functional as F
3
- import math
4
- import numpy as np
5
- import matplotlib.pyplot as plt
6
-
7
- @torch.no_grad()
8
- def gaussian_layer_stack_pipeline(
9
- x: torch.Tensor,
10
- n_layers: int,
11
- base_ksize: int = 3,
12
- ksize_growth: int = 2,
13
- sigma: float | None = None,
14
- eps: float = 1e-8,
15
- ):
16
- """
17
- All-in-one GPU batch pipeline:
18
- 1) Per-sample min-max normalize to [0,1]
19
- 2) Resize to (32,32)
20
- 3) Apply L Gaussian blurs with increasing kernel size in a single
21
- horizontal conv + single vertical conv using depthwise groups
22
- (via a shared max kernel padded with zeros)
23
- 4) Renormalize each layer to [0,1]
24
- 5) Return stacked (B,L,32,32), flat (B,L,1024), tiled (B,L,1024,1024 view)
25
-
26
- Args:
27
- x: (B,H,W) or (B,1,H,W) tensor (any device/dtype)
28
- n_layers: number of layers
29
- base_ksize: starting odd kernel size (e.g., 3)
30
- ksize_growth: increment per layer (e.g., 2) -> ensures odd sizes
31
- sigma: if None, uses (ksize-1)/6 per layer; else fixed sigma for all
32
- eps: small number for safe division
33
-
34
- Returns:
35
- stacked: (B, n_layers, 32, 32) float on x.device
36
- flat: (B, n_layers, 1024)
37
- tiled: (B, n_layers, 1024, 1024) (expand view; memory-cheap)
38
- """
39
- assert n_layers >= 1, "n_layers must be >= 1"
40
-
41
- # ---- Ensure 4D, 1 channel; cast to float (stay on same device) ----
42
- if x.ndim == 3:
43
- x = x.unsqueeze(1) # (B,1,H,W)
44
- elif x.ndim != 4 or x.shape[1] not in (1,):
45
- raise ValueError(f"Expected (B,H,W) or (B,1,H,W); got {tuple(x.shape)}")
46
- x = x.float()
47
-
48
- B, _, H, W = x.shape
49
-
50
- # ---- Per-sample min-max normalize to [0,1] ----
51
- xmin = x.amin(dim=(2, 3), keepdim=True)
52
- xmax = x.amax(dim=(2, 3), keepdim=True)
53
- denom = (xmax - xmin).clamp_min(eps)
54
- x = (x - xmin) / denom # (B,1,H,W) in [0,1]
55
-
56
- # ---- Resize to 32x32 on GPU ----
57
- x = F.interpolate(x, size=(32, 32), mode="bilinear", align_corners=False) # (B,1,32,32)
58
-
59
- # ---- Prepare per-layer kernel sizes (odd) ----
60
- ksizes = []
61
- for i in range(n_layers, 0, -1): # to keep your original ordering: L...1
62
- k = base_ksize + i * ksize_growth
63
- k = int(k)
64
- if k % 2 == 0:
65
- k += 1
66
- k = max(k, 1)
67
- ksizes.append(k)
68
-
69
- Kmax = max(ksizes)
70
- pad = Kmax // 2
71
-
72
- # ---- Build per-layer 1D Gaussian vectors and embed into shared Kmax kernel ----
73
- # We create horizontal weights of shape (L,1,1,Kmax) and vertical (L,1,Kmax,1)
74
- device, dtype = x.device, x.dtype
75
- weight_h = torch.zeros((n_layers, 1, 1, Kmax), device=device, dtype=dtype)
76
- weight_v = torch.zeros((n_layers, 1, Kmax, 1), device=device, dtype=dtype)
77
-
78
- for idx, k in enumerate(ksizes):
79
- # choose sigma
80
- sig = sigma if (sigma is not None and sigma > 0) else (k - 1) / 6.0
81
- r = k // 2
82
- xp = torch.arange(-r, r + 1, device=device, dtype=dtype)
83
- g = torch.exp(-(xp * xp) / (2.0 * sig * sig))
84
- g = g / g.sum() # (k,)
85
-
86
- # center g into Kmax with zeros around
87
- start = (Kmax - k) // 2
88
- end = start + k
89
-
90
- # horizontal row
91
- weight_h[idx, 0, 0, start:end] = g # (1 x Kmax)
92
-
93
- # vertical column
94
- weight_v[idx, 0, start:end, 0] = g # (Kmax x 1)
95
-
96
- # ---- Duplicate input across L channels (depthwise groups) ----
97
- xL = x.expand(B, n_layers, 32, 32).contiguous() # (B,L,32,32)
98
-
99
- # ---- Separable Gaussian blur with a single pass per axis (groups=L) ----
100
- # Horizontal
101
- xh = F.pad(xL, (pad, pad, 0, 0), mode="reflect")
102
- xh = F.conv2d(xh, weight=weight_h, bias=None, stride=1, padding=0, groups=n_layers) # (B,L,32,32)
103
-
104
- # Vertical
105
- xv = F.pad(xh, (0, 0, pad, pad), mode="reflect")
106
- yL = F.conv2d(xv, weight=weight_v, bias=None, stride=1, padding=0, groups=n_layers) # (B,L,32,32)
107
-
108
- # ---- Renormalize each layer to [0,1] (per-sample, per-layer) ----
109
- y_min = yL.amin(dim=(2, 3), keepdim=True)
110
- y_max = yL.amax(dim=(2, 3), keepdim=True)
111
- y_den = (y_max - y_min).clamp_min(eps)
112
- stacked = (yL - y_min) / y_den # (B,L,32,32) in [0,1]
113
-
114
- # ---- Flatten + tile (expand view; caution w/ later materialization) ----
115
- flat = stacked.reshape(B, n_layers, 32 * 32) # (B,L,1024)
116
- tiled = flat.unsqueeze(-2).expand(-1, -1, 32 * 32, -1) # (B,L,1024,1024) view
117
-
118
- return stacked, flat, tiled
119
-
120
- def plot_layers_any(
121
- x,
122
- *,
123
- max_batches=None,
124
- vlim=(0, 1),
125
- one_indexed: bool = False,
126
- max_cols: int = 6,
127
- ):
128
- """
129
- Plot layers for each batch sample in separate figures.
130
-
131
- Accepts:
132
- - stacked: (B, L, H, W)
133
- - flat: (B, L, HW)
134
- - tiled: (B, L, HW, HW)
135
-
136
- Behavior:
137
- - Creates one figure PER BATCH (up to `max_batches`).
138
- - At most `max_cols` layers per row (default 6).
139
- - Column headers: 'Layer {i}' descending from n-1 -> 0 (or n -> 1 if one_indexed=True).
140
- - Figure title per batch: 'Masks for input {i} out of {B}'.
141
-
142
- Returns:
143
- A list of (fig, axes) tuples, one per plotted batch.
144
- """
145
- # ---- Normalize input to torch ----
146
- if isinstance(x, np.ndarray):
147
- x = torch.from_numpy(x)
148
- if not isinstance(x, torch.Tensor):
149
- raise TypeError(f"Expected torch.Tensor or np.ndarray, got {type(x)}")
150
-
151
- if x.ndim not in (3, 4):
152
- raise ValueError(f"Expected ndim 3 or 4, got shape {tuple(x.shape)}")
153
-
154
- # ---- Convert to (B, L, H, W) 'stacked' ----
155
- if x.ndim == 4:
156
- B, L, A, B_ = x.shape
157
- if A == B_:
158
- # Could be stacked (H==W) or tiled (HW x HW). Heuristic: if A is a perfect square
159
- # and reasonably large (e.g., 1024), treat as tiled and collapse to flat.
160
- s = int(math.isqrt(A))
161
- if s * s == A and A >= 64:
162
- flat = x[..., 0, :].detach() # (B, L, HW)
163
- H = W = s
164
- stacked = flat.reshape(B, L, H, W)
165
- else:
166
- stacked = x.detach()
167
- else:
168
- stacked = x.detach()
169
- else:
170
- # x.ndim == 3 -> (B, L, HW)
171
- B, L, HW = x.shape
172
- s = int(math.isqrt(HW))
173
- if s * s != HW:
174
- if HW != 32 * 32:
175
- raise ValueError(
176
- f"Cannot infer square image size from HW={HW}. "
177
- f"Provide stacked (B,L,H,W) or flat with square HW."
178
- )
179
- s = 32
180
- H = W = s
181
- stacked = x.detach().reshape(B, L, H, W)
182
-
183
- # Ensure float & CPU for plotting
184
- stacked = stacked.to(torch.float32).cpu().numpy()
185
-
186
- # ---- Batch selection ----
187
- B, L, H, W = stacked.shape
188
- plot_B = B if max_batches is None else max(1, min(B, int(max_batches)))
189
-
190
- # ---- Layout params ----
191
- cols = max(1, int(max_cols))
192
- rows_needed = lambda L: (L + cols - 1) // cols
193
-
194
- figs = []
195
- for b in range(plot_B):
196
- # number of rows for this batch
197
- r = rows_needed(L)
198
- fig, axes = plt.subplots(r, cols, figsize=(cols * 3, r * 3), squeeze=False)
199
- fig.suptitle(f"Masks for input {b} out of {B}", fontsize=12, y=1.02)
200
-
201
- for l in range(L):
202
- rr = l // cols
203
- cc = l % cols
204
- ax = axes[rr, cc]
205
- if vlim is None:
206
- ax.imshow(stacked[b, l], cmap="gray")
207
- else:
208
- ax.imshow(stacked[b, l], cmap="gray", vmin=vlim[0], vmax=vlim[1])
209
- ax.axis("off")
210
-
211
- # Set column titles only on the first row of the grid
212
- label_num = (l + 1) if one_indexed else l
213
- ax.set_title(f"Layer {label_num}", fontsize=10)
214
-
215
- # Hide any unused axes (when L is not a multiple of cols)
216
- total_slots = r * cols
217
- for empty_idx in range(L, total_slots):
218
- rr = empty_idx // cols
219
- cc = empty_idx % cols
220
- axes[rr, cc].axis("off")
221
-
222
- plt.tight_layout()
223
- figs.append((fig, axes))
224
- return figs
 
1
+ import torch
2
+ import torch.nn.functional as F
3
+ import math
4
+ import numpy as np
5
+ import matplotlib.pyplot as plt
6
+
7
+ @torch.no_grad()
8
+ def gaussian_layer_stack_pipeline(
9
+ x: torch.Tensor,
10
+ n_layers: int,
11
+ base_ksize: int = 3,
12
+ ksize_growth: int = 2,
13
+ sigma: float | None = None,
14
+ eps: float = 1e-8,
15
+ ):
16
+ """
17
+ All-in-one GPU batch pipeline:
18
+ 1) Per-sample min-max normalize to [0,1]
19
+ 2) Resize to (32,32)
20
+ 3) Apply L Gaussian blurs with increasing kernel size in a single
21
+ horizontal conv + single vertical conv using depthwise groups
22
+ (via a shared max kernel padded with zeros)
23
+ 4) Renormalize each layer to [0,1]
24
+ 5) Return stacked (B,L,32,32), flat (B,L,1024), tiled (B,L,1024,1024 view)
25
+
26
+ Args:
27
+ x: (B,H,W) or (B,1,H,W) tensor (any device/dtype)
28
+ n_layers: number of layers
29
+ base_ksize: starting odd kernel size (e.g., 3)
30
+ ksize_growth: increment per layer (e.g., 2) -> ensures odd sizes
31
+ sigma: if None, uses (ksize-1)/6 per layer; else fixed sigma for all
32
+ eps: small number for safe division
33
+
34
+ Returns:
35
+ stacked: (B, n_layers, 32, 32) float on x.device
36
+ flat: (B, n_layers, 1024)
37
+ tiled: (B, n_layers, 1024, 1024) (expand view; memory-cheap)
38
+ """
39
+ assert n_layers >= 1, "n_layers must be >= 1"
40
+
41
+ # ---- Ensure 4D, 1 channel; cast to float (stay on same device) ----
42
+ if x.ndim == 3:
43
+ x = x.unsqueeze(1) # (B,1,H,W)
44
+ elif x.ndim != 4 or x.shape[1] not in (1,):
45
+ raise ValueError(f"Expected (B,H,W) or (B,1,H,W); got {tuple(x.shape)}")
46
+ x = x.float()
47
+
48
+ B, _, H, W = x.shape
49
+
50
+ # ---- Per-sample min-max normalize to [0,1] ----
51
+ xmin = x.amin(dim=(2, 3), keepdim=True)
52
+ xmax = x.amax(dim=(2, 3), keepdim=True)
53
+ denom = (xmax - xmin).clamp_min(eps)
54
+ x = (x - xmin) / denom # (B,1,H,W) in [0,1]
55
+
56
+ # ---- Resize to 32x32 on GPU ----
57
+ x = F.interpolate(x, size=(32, 32), mode="bilinear", align_corners=False) # (B,1,32,32)
58
+
59
+ # ---- Prepare per-layer kernel sizes (odd) ----
60
+ ksizes = []
61
+ for i in range(n_layers, 0, -1): # to keep your original ordering: L...1
62
+ k = base_ksize + i * ksize_growth
63
+ k = int(k)
64
+ if k % 2 == 0:
65
+ k += 1
66
+ k = max(k, 1)
67
+ ksizes.append(k)
68
+
69
+ Kmax = max(ksizes)
70
+ pad = Kmax // 2
71
+
72
+ # ---- Build per-layer 1D Gaussian vectors and embed into shared Kmax kernel ----
73
+ # We create horizontal weights of shape (L,1,1,Kmax) and vertical (L,1,Kmax,1)
74
+ device, dtype = x.device, x.dtype
75
+ weight_h = torch.zeros((n_layers, 1, 1, Kmax), device=device, dtype=dtype)
76
+ weight_v = torch.zeros((n_layers, 1, Kmax, 1), device=device, dtype=dtype)
77
+
78
+ for idx, k in enumerate(ksizes):
79
+ # choose sigma
80
+ sig = sigma if (sigma is not None and sigma > 0) else (k - 1) / 6.0
81
+ r = k // 2
82
+ xp = torch.arange(-r, r + 1, device=device, dtype=dtype)
83
+ g = torch.exp(-(xp * xp) / (2.0 * sig * sig))
84
+ g = g / g.sum() # (k,)
85
+
86
+ # center g into Kmax with zeros around
87
+ start = (Kmax - k) // 2
88
+ end = start + k
89
+
90
+ # horizontal row
91
+ weight_h[idx, 0, 0, start:end] = g # (1 x Kmax)
92
+
93
+ # vertical column
94
+ weight_v[idx, 0, start:end, 0] = g # (Kmax x 1)
95
+
96
+ # ---- Duplicate input across L channels (depthwise groups) ----
97
+ xL = x.expand(B, n_layers, 32, 32).contiguous() # (B,L,32,32)
98
+
99
+ # ---- Separable Gaussian blur with a single pass per axis (groups=L) ----
100
+ # Horizontal
101
+ xh = F.pad(xL, (pad, pad, 0, 0), mode="reflect")
102
+ xh = F.conv2d(xh, weight=weight_h, bias=None, stride=1, padding=0, groups=n_layers) # (B,L,32,32)
103
+
104
+ # Vertical
105
+ xv = F.pad(xh, (0, 0, pad, pad), mode="reflect")
106
+ yL = F.conv2d(xv, weight=weight_v, bias=None, stride=1, padding=0, groups=n_layers) # (B,L,32,32)
107
+
108
+ # ---- Renormalize each layer to [0,1] (per-sample, per-layer) ----
109
+ y_min = yL.amin(dim=(2, 3), keepdim=True)
110
+ y_max = yL.amax(dim=(2, 3), keepdim=True)
111
+ y_den = (y_max - y_min).clamp_min(eps)
112
+ stacked = (yL - y_min) / y_den # (B,L,32,32) in [0,1]
113
+
114
+ # ---- Flatten + tile (expand view; caution w/ later materialization) ----
115
+ flat = stacked.reshape(B, n_layers, 32 * 32) # (B,L,1024)
116
+ tiled = flat.unsqueeze(-2).expand(-1, -1, 2 * 32 * 32, -1) # (B,L,1024,1024) view
117
+
118
+ return stacked, flat, tiled
119
+
120
+ def plot_layers_any(
121
+ x,
122
+ *,
123
+ max_batches=None,
124
+ vlim=(0, 1),
125
+ one_indexed: bool = False,
126
+ max_cols: int = 6,
127
+ ):
128
+ """
129
+ Plot layers for each batch sample in separate figures.
130
+
131
+ Accepts:
132
+ - stacked: (B, L, H, W)
133
+ - flat: (B, L, HW)
134
+ - tiled: (B, L, HW, HW)
135
+
136
+ Behavior:
137
+ - Creates one figure PER BATCH (up to `max_batches`).
138
+ - At most `max_cols` layers per row (default 6).
139
+ - Column headers: 'Layer {i}' descending from n-1 -> 0 (or n -> 1 if one_indexed=True).
140
+ - Figure title per batch: 'Masks for input {i} out of {B}'.
141
+
142
+ Returns:
143
+ A list of (fig, axes) tuples, one per plotted batch.
144
+ """
145
+ # ---- Normalize input to torch ----
146
+ if isinstance(x, np.ndarray):
147
+ x = torch.from_numpy(x)
148
+ if not isinstance(x, torch.Tensor):
149
+ raise TypeError(f"Expected torch.Tensor or np.ndarray, got {type(x)}")
150
+
151
+ if x.ndim not in (3, 4):
152
+ raise ValueError(f"Expected ndim 3 or 4, got shape {tuple(x.shape)}")
153
+
154
+ # ---- Convert to (B, L, H, W) 'stacked' ----
155
+ if x.ndim == 4:
156
+ B, L, A, B_ = x.shape
157
+ if A == B_:
158
+ # Could be stacked (H==W) or tiled (HW x HW). Heuristic: if A is a perfect square
159
+ # and reasonably large (e.g., 1024), treat as tiled and collapse to flat.
160
+ s = int(math.isqrt(A))
161
+ if s * s == A and A >= 64:
162
+ flat = x[..., 0, :].detach() # (B, L, HW)
163
+ H = W = s
164
+ stacked = flat.reshape(B, L, H, W)
165
+ else:
166
+ stacked = x.detach()
167
+ else:
168
+ stacked = x.detach()
169
+ else:
170
+ # x.ndim == 3 -> (B, L, HW)
171
+ B, L, HW = x.shape
172
+ s = int(math.isqrt(HW))
173
+ if s * s != HW:
174
+ if HW != 32 * 32:
175
+ raise ValueError(
176
+ f"Cannot infer square image size from HW={HW}. "
177
+ f"Provide stacked (B,L,H,W) or flat with square HW."
178
+ )
179
+ s = 32
180
+ H = W = s
181
+ stacked = x.detach().reshape(B, L, H, W)
182
+
183
+ # Ensure float & CPU for plotting
184
+ stacked = stacked.to(torch.float32).cpu().numpy()
185
+
186
+ # ---- Batch selection ----
187
+ B, L, H, W = stacked.shape
188
+ plot_B = B if max_batches is None else max(1, min(B, int(max_batches)))
189
+
190
+ # ---- Layout params ----
191
+ cols = max(1, int(max_cols))
192
+ rows_needed = lambda L: (L + cols - 1) // cols
193
+
194
+ figs = []
195
+ for b in range(plot_B):
196
+ # number of rows for this batch
197
+ r = rows_needed(L)
198
+ fig, axes = plt.subplots(r, cols, figsize=(cols * 3, r * 3), squeeze=False)
199
+ fig.suptitle(f"Masks for input {b} out of {B}", fontsize=12, y=1.02)
200
+
201
+ for l in range(L):
202
+ rr = l // cols
203
+ cc = l % cols
204
+ ax = axes[rr, cc]
205
+ if vlim is None:
206
+ ax.imshow(stacked[b, l], cmap="gray")
207
+ else:
208
+ ax.imshow(stacked[b, l], cmap="gray", vmin=vlim[0], vmax=vlim[1])
209
+ ax.axis("off")
210
+
211
+ # Set column titles only on the first row of the grid
212
+ label_num = (l + 1) if one_indexed else l
213
+ ax.set_title(f"Layer {label_num}", fontsize=10)
214
+
215
+ # Hide any unused axes (when L is not a multiple of cols)
216
+ total_slots = r * cols
217
+ for empty_idx in range(L, total_slots):
218
+ rr = empty_idx // cols
219
+ cc = empty_idx % cols
220
+ axes[rr, cc].axis("off")
221
+
222
+ plt.tight_layout()
223
+ figs.append((fig, axes))
224
+ return figs