Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| """ | |
| 款式选择器 v7 | |
| - 支持WPS嵌入单元格图片 (DISPIMG) | |
| - 支持传统浮动图片 (Drawing) | |
| - 支持合并单元格(同款多颜色) | |
| - 动态列识别 | |
| """ | |
| import gradio as gr | |
| import pandas as pd | |
| import openpyxl | |
| from openpyxl.drawing.image import Image as XLImage | |
| from PIL import Image | |
| import io | |
| import base64 | |
| import os | |
| import tempfile | |
| from datetime import datetime | |
| import zipfile | |
| import re | |
| from xml.etree import ElementTree as ET | |
| def extract_floating_images(file_path): | |
| """提取传统浮动图片(Drawing方式)""" | |
| images = {} | |
| try: | |
| with zipfile.ZipFile(file_path, 'r') as zf: | |
| # 1. 读取媒体文件 | |
| media_files = {} | |
| for name in zf.namelist(): | |
| if name.startswith('xl/media/'): | |
| img_name = os.path.basename(name) | |
| media_files[img_name] = zf.read(name) | |
| if not media_files: | |
| return images | |
| # 2. 获取sheet名称 | |
| sheet_names = [] | |
| if 'xl/workbook.xml' in zf.namelist(): | |
| wb_xml = zf.read('xl/workbook.xml').decode('utf-8') | |
| for match in re.finditer(r'<sheet[^>]*name="([^"]*)"', wb_xml): | |
| sheet_names.append(match.group(1)) | |
| # 3. 遍历每个sheet的drawing关系 | |
| for sheet_idx in range(1, 20): | |
| sheet_rels_path = f'xl/worksheets/_rels/sheet{sheet_idx}.xml.rels' | |
| if sheet_rels_path not in zf.namelist(): | |
| continue | |
| sheet_name = sheet_names[sheet_idx-1] if sheet_idx <= len(sheet_names) else f'Sheet{sheet_idx}' | |
| # 找到关联的drawing文件 | |
| rels_xml = zf.read(sheet_rels_path).decode('utf-8') | |
| drawing_match = re.search(r'(drawing\d+)\.xml', rels_xml) | |
| if not drawing_match: | |
| continue | |
| drawing_name = drawing_match.group(1) | |
| drawing_path = f'xl/drawings/{drawing_name}.xml' | |
| drawing_rels_path = f'xl/drawings/_rels/{drawing_name}.xml.rels' | |
| if drawing_path not in zf.namelist(): | |
| continue | |
| # 4. 解析drawing的rels获取rId到图片的映射 | |
| rid_to_image = {} | |
| if drawing_rels_path in zf.namelist(): | |
| drels_xml = zf.read(drawing_rels_path).decode('utf-8') | |
| # 匹配Relationship标签,提取Id和Target(属性顺序不固定) | |
| for rel_tag in re.finditer(r'<Relationship[^>]+/?>', drels_xml): | |
| tag_content = rel_tag.group(0) | |
| id_match = re.search(r'Id="(rId\d+)"', tag_content) | |
| target_match = re.search(r'Target="([^"]*)"', tag_content) | |
| if id_match and target_match: | |
| rid = id_match.group(1) | |
| target = target_match.group(1) | |
| if 'media/' in target: | |
| img_name = os.path.basename(target) | |
| if img_name in media_files: | |
| rid_to_image[rid] = media_files[img_name] | |
| # 5. 解析drawing.xml获取图片位置 | |
| drawing_xml = zf.read(drawing_path).decode('utf-8') | |
| # 匹配 oneCellAnchor 或 twoCellAnchor(支持带命名空间前缀如xdr:) | |
| # <xdr:from><xdr:row>1</xdr:row>...</xdr:from> | |
| anchor_pattern = r'<(?:\w+:)?(?:oneCellAnchor|twoCellAnchor)[^>]*>.*?<(?:\w+:)?from>.*?<(?:\w+:)?row>(\d+)</(?:\w+:)?row>.*?</(?:\w+:)?from>.*?r:embed="(rId\d+)".*?</(?:\w+:)?(?:oneCellAnchor|twoCellAnchor)>' | |
| for anchor_match in re.finditer(anchor_pattern, drawing_xml, re.DOTALL): | |
| row = int(anchor_match.group(1)) + 1 # 0-indexed -> 1-indexed | |
| rid = anchor_match.group(2) | |
| if rid in rid_to_image: | |
| key = (sheet_name, row) | |
| if key not in images: | |
| images[key] = rid_to_image[rid] | |
| except Exception as e: | |
| print(f"提取浮动图片出错: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return images | |
| def extract_wps_cellimages(file_path): | |
| """提取WPS嵌入单元格的图片""" | |
| images = {} | |
| image_id_to_data = {} | |
| try: | |
| with zipfile.ZipFile(file_path, 'r') as zf: | |
| # 1. 读取媒体文件 | |
| media_files = {} | |
| for name in zf.namelist(): | |
| if name.startswith('xl/media/'): | |
| img_name = os.path.basename(name) | |
| media_files[img_name] = zf.read(name) | |
| # 2. 解析 cellimages.xml.rels | |
| rid_to_image = {} | |
| if 'xl/_rels/cellimages.xml.rels' in zf.namelist(): | |
| rels_xml = zf.read('xl/_rels/cellimages.xml.rels').decode('utf-8') | |
| rels_root = ET.fromstring(rels_xml) | |
| for rel in rels_root: | |
| rid = rel.get('Id') | |
| target = rel.get('Target') | |
| if rid and target and 'media/' in target: | |
| img_name = os.path.basename(target) | |
| if img_name in media_files: | |
| rid_to_image[rid] = media_files[img_name] | |
| # 3. 解析 cellimages.xml | |
| if 'xl/cellimages.xml' in zf.namelist(): | |
| cellimages_xml = zf.read('xl/cellimages.xml').decode('utf-8') | |
| namespaces = { | |
| 'etc': 'http://www.wps.cn/officeDocument/2017/etCustomData', | |
| 'xdr': 'http://schemas.openxmlformats.org/drawingml/2006/spreadsheetDrawing', | |
| 'a': 'http://schemas.openxmlformats.org/drawingml/2006/main', | |
| 'r': 'http://schemas.openxmlformats.org/officeDocument/2006/relationships' | |
| } | |
| root = ET.fromstring(cellimages_xml) | |
| for cellImage in root.findall('.//etc:cellImage', namespaces): | |
| pic = cellImage.find('.//xdr:pic', namespaces) | |
| if pic is not None: | |
| cNvPr = pic.find('.//xdr:cNvPr', namespaces) | |
| if cNvPr is not None: | |
| image_id = cNvPr.get('name') | |
| blip = pic.find('.//a:blip', namespaces) | |
| if blip is not None: | |
| rid = blip.get('{http://schemas.openxmlformats.org/officeDocument/2006/relationships}embed') | |
| if image_id and rid and rid in rid_to_image: | |
| image_id_to_data[image_id] = rid_to_image[rid] | |
| # 4. 获取sheet名称 | |
| sheet_names = [] | |
| if 'xl/workbook.xml' in zf.namelist(): | |
| wb_xml = zf.read('xl/workbook.xml').decode('utf-8') | |
| for match in re.finditer(r'<sheet[^>]*name="([^"]*)"', wb_xml): | |
| sheet_names.append(match.group(1)) | |
| # 5. 解析每个sheet的DISPIMG | |
| for sheet_idx in range(1, 20): | |
| sheet_path = f'xl/worksheets/sheet{sheet_idx}.xml' | |
| if sheet_path not in zf.namelist(): | |
| break | |
| sheet_name = sheet_names[sheet_idx-1] if sheet_idx <= len(sheet_names) else f'Sheet{sheet_idx}' | |
| sheet_xml = zf.read(sheet_path).decode('utf-8') | |
| # 修复:处理两种单元格格式 | |
| # 1. 自闭合: <c r="A1" s="9"/> | |
| # 2. 正常闭合: <c r="A1" ...>...</c> | |
| # 使用更精确的正则匹配正常闭合的单元格(包含DISPIMG的必然是正常闭合的) | |
| cell_pattern = r'<c r="([A-Z]+)(\d+)"[^/>]*>([^<]*(?:<(?!/c>)[^<]*)*)</c>' | |
| for cell_match in re.finditer(cell_pattern, sheet_xml, re.DOTALL): | |
| cell_content = cell_match.group(0) | |
| if 'DISPIMG' in cell_content: | |
| col = cell_match.group(1) | |
| row = int(cell_match.group(2)) | |
| img_match = re.search(r'DISPIMG\("([^&]+)"', cell_content) | |
| if img_match: | |
| image_id = img_match.group(1) | |
| if image_id in image_id_to_data: | |
| key = (sheet_name, row) | |
| if key not in images: | |
| images[key] = image_id_to_data[image_id] | |
| except Exception as e: | |
| print(f"提取图片出错: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return images | |
| def parse_merged_cells(file_path): | |
| """解析合并单元格信息""" | |
| merged_ranges = {} # sheet_name -> [(start_row, end_row, start_col, end_col, value)] | |
| try: | |
| with zipfile.ZipFile(file_path, 'r') as zf: | |
| # 获取sheet名称 | |
| sheet_names = [] | |
| if 'xl/workbook.xml' in zf.namelist(): | |
| wb_xml = zf.read('xl/workbook.xml').decode('utf-8') | |
| for match in re.finditer(r'<sheet[^>]*name="([^"]*)"', wb_xml): | |
| sheet_names.append(match.group(1)) | |
| for sheet_idx in range(1, 20): | |
| sheet_path = f'xl/worksheets/sheet{sheet_idx}.xml' | |
| if sheet_path not in zf.namelist(): | |
| break | |
| sheet_name = sheet_names[sheet_idx-1] if sheet_idx <= len(sheet_names) else f'Sheet{sheet_idx}' | |
| sheet_xml = zf.read(sheet_path).decode('utf-8') | |
| # 找合并单元格 <mergeCell ref="B6:B7"/> | |
| merged_ranges[sheet_name] = [] | |
| for match in re.finditer(r'<mergeCell ref="([A-Z]+)(\d+):([A-Z]+)(\d+)"', sheet_xml): | |
| start_col = match.group(1) | |
| start_row = int(match.group(2)) | |
| end_col = match.group(3) | |
| end_row = int(match.group(4)) | |
| merged_ranges[sheet_name].append((start_row, end_row, start_col, end_col)) | |
| except Exception as e: | |
| print(f"解析合并单元格出错: {e}") | |
| return merged_ranges | |
| def col_letter_to_index(col): | |
| """A->0, B->1, ..., Z->25, AA->26...""" | |
| result = 0 | |
| for char in col: | |
| result = result * 26 + (ord(char) - ord('A') + 1) | |
| return result - 1 | |
| def find_header_row(df, max_rows=20): | |
| """查找表头行""" | |
| keywords = ['款号', '货号', 'SKU', '序号', '编号', '图片', '款式'] | |
| for idx in range(min(max_rows, len(df))): | |
| row = df.iloc[idx] | |
| for val in row.values: | |
| if pd.notna(val): | |
| val_str = str(val).strip() | |
| for kw in keywords: | |
| if kw in val_str: | |
| return idx | |
| return 0 | |
| def load_excel(file, progress=gr.Progress()): | |
| """加载Excel文件""" | |
| if file is None: | |
| return "请先上传Excel文件", [], {}, [] | |
| all_data = [] | |
| all_columns = [] | |
| column_set = set() | |
| try: | |
| file_path = file if isinstance(file, str) else file.name | |
| file_size = os.path.getsize(file_path) / (1024 * 1024) | |
| # 大文件提示,但不阻止上传 | |
| size_warning = "" | |
| if file_size > 500: | |
| size_warning = f"⚠️ 大文件 ({file_size:.0f}MB),处理中请耐心等待...\n" | |
| progress(0.05, desc=f"读取文件 ({file_size:.0f}MB)...") | |
| progress(0.1, desc="提取图片...") | |
| # 同时尝试两种图片格式 | |
| all_images = extract_wps_cellimages(file_path) # WPS嵌入式DISPIMG | |
| floating_images = extract_floating_images(file_path) # 传统浮动图片 | |
| # 合并(优先使用DISPIMG,如果没有再用浮动图片) | |
| for key, data in floating_images.items(): | |
| if key not in all_images: | |
| all_images[key] = data | |
| progress(0.2, desc="解析合并单元格...") | |
| merged_ranges = parse_merged_cells(file_path) | |
| progress(0.3, desc="读取数据...") | |
| xl = pd.ExcelFile(file_path) | |
| total_sheets = len(xl.sheet_names) | |
| for sheet_idx, sheet_name in enumerate(xl.sheet_names): | |
| progress(0.3 + 0.6 * (sheet_idx / total_sheets), desc=f"处理 {sheet_name}...") | |
| df = pd.read_excel(xl, sheet_name=sheet_name, header=None) | |
| if df.empty: | |
| continue | |
| header_row = find_header_row(df) | |
| raw_headers = df.iloc[header_row].tolist() | |
| # 列映射 | |
| col_mapping = {} | |
| for i, h in enumerate(raw_headers): | |
| if pd.notna(h): | |
| col_name = str(h).strip() | |
| if col_name and 'DISPIMG' not in col_name and col_name not in ['nan', 'None']: | |
| col_mapping[i] = col_name | |
| if col_name not in column_set: | |
| column_set.add(col_name) | |
| all_columns.append(col_name) | |
| # 找款号列 | |
| sku_col = None | |
| sku_keywords = ['款号', '货号', 'SKU', 'sku', '编号'] | |
| for i, name in col_mapping.items(): | |
| for kw in sku_keywords: | |
| if kw in name: | |
| sku_col = i | |
| break | |
| if sku_col is not None: | |
| break | |
| if sku_col is None: | |
| continue | |
| # 处理合并单元格 - 建立行->款号的映射 | |
| row_to_sku = {} | |
| sheet_merges = merged_ranges.get(sheet_name, []) | |
| # 先读取所有非空款号 | |
| for idx in range(header_row + 1, len(df)): | |
| row = df.iloc[idx] | |
| sku_val = row.iloc[sku_col] if sku_col < len(row) else None | |
| if pd.notna(sku_val): | |
| sku_str = str(sku_val).strip() | |
| if sku_str and 'DISPIMG' not in sku_str: | |
| excel_row = idx + 1 | |
| row_to_sku[excel_row] = sku_str | |
| # 处理合并单元格 - 填充空行的款号 | |
| for start_row, end_row, start_col, end_col in sheet_merges: | |
| # 检查是否是款号列的合并 | |
| sku_col_letter = openpyxl.utils.get_column_letter(sku_col + 1) | |
| if start_col <= sku_col_letter <= end_col: | |
| # 找到合并区域的款号值 | |
| merge_sku = row_to_sku.get(start_row) | |
| if merge_sku: | |
| # 填充到所有合并的行 | |
| for r in range(start_row, end_row + 1): | |
| if r not in row_to_sku: | |
| row_to_sku[r] = merge_sku | |
| # 读取所有数据行 | |
| for idx in range(header_row + 1, len(df)): | |
| excel_row = idx + 1 | |
| # 获取款号(优先从合并单元格映射) | |
| sku = row_to_sku.get(excel_row) | |
| if not sku: | |
| continue | |
| row = df.iloc[idx] | |
| has_image = (sheet_name, excel_row) in all_images | |
| unique_id = f"{sku}__{sheet_name}__{excel_row}" | |
| item = { | |
| '_sku': sku, | |
| '_sheet': sheet_name, | |
| '_excel_row': excel_row, | |
| '_has_image': has_image, | |
| '_unique_id': unique_id | |
| } | |
| # 读取所有列 | |
| for col_idx, col_name in col_mapping.items(): | |
| if col_idx < len(row): | |
| val = row.iloc[col_idx] | |
| if pd.notna(val): | |
| val_str = str(val).strip() | |
| if 'DISPIMG' not in val_str: | |
| item[col_name] = val_str | |
| else: | |
| item[col_name] = '' | |
| else: | |
| item[col_name] = '' | |
| else: | |
| item[col_name] = '' | |
| # 检查是否是空行(除了款号外其他都是空的) | |
| has_content = False | |
| for col_name in col_mapping.values(): | |
| if col_name not in ['款号', '货号', '编号', 'SKU']: | |
| if item.get(col_name, ''): | |
| has_content = True | |
| break | |
| if has_content or has_image: | |
| all_data.append(item) | |
| xl.close() | |
| # 确保所有item都有所有列 | |
| for item in all_data: | |
| for col in all_columns: | |
| if col not in item: | |
| item[col] = '' | |
| progress(1.0, desc="完成!") | |
| sheets_count = len(set(item['_sheet'] for item in all_data)) | |
| images_count = sum(1 for item in all_data if item['_has_image']) | |
| status = f"✅ 加载成功! {len(all_data)}条数据, {sheets_count}个Sheet, {images_count}张图片, {len(all_columns)}个字段" | |
| if size_warning: | |
| status = size_warning + status | |
| return status, all_data, all_images, all_columns | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| return f"❌ 加载失败: {str(e)}", [], {}, [] | |
| def search_sku(keyword, all_data): | |
| """搜索款号 - 支持同款多颜色""" | |
| if not keyword or not all_data: | |
| return [] | |
| keyword = keyword.lower().strip() | |
| matches = [] | |
| # 先按款号分组 | |
| sku_groups = {} | |
| for item in all_data: | |
| if keyword in item['_sku'].lower(): | |
| sku = item['_sku'] | |
| if sku not in sku_groups: | |
| sku_groups[sku] = [] | |
| sku_groups[sku].append(item) | |
| # 为每个款号生成选项 | |
| for sku, items in sku_groups.items(): | |
| # 如果同款有多个颜色,添加"全部颜色"选项 | |
| if len(items) > 1: | |
| # 收集所有颜色 | |
| colors = [] | |
| for item in items: | |
| color = '' | |
| for key in ['大货颜色', '颜色', '色号', '新色']: | |
| if key in item and item[key]: | |
| color = item[key].replace('\n', '/').strip() | |
| break | |
| if color: | |
| colors.append(color) | |
| # 添加"全部颜色"选项 | |
| color_preview = ', '.join(colors[:3]) | |
| if len(colors) > 3: | |
| color_preview += f'...(共{len(colors)}色)' | |
| has_any_image = any(item['_has_image'] for item in items) | |
| all_label = f"🎨 {sku} - 全部颜色 ({color_preview}) [{items[0]['_sheet']}]" | |
| if has_any_image: | |
| all_label += " 📷" | |
| # 用特殊前缀标记"全部"选项 | |
| all_ids = "__ALL__" + "||".join(item['_unique_id'] for item in items) | |
| matches.append((all_label, all_ids)) | |
| # 添加每个单独颜色选项 | |
| for item in items: | |
| color = '' | |
| for key in ['大货颜色', '颜色', '色号', '新色']: | |
| if key in item and item[key]: | |
| color = item[key].replace('\n', '/').strip() | |
| break | |
| label = f"{item['_sku']}" | |
| if color: | |
| label += f" - {color}" | |
| label += f" [{item['_sheet']}]" | |
| if item['_has_image']: | |
| label += " 📷" | |
| matches.append((label, item['_unique_id'])) | |
| return matches | |
| def get_image_base64(all_images, item, max_size=80): | |
| """获取图片base64""" | |
| key = (item['_sheet'], item['_excel_row']) | |
| if key not in all_images: | |
| return None | |
| try: | |
| img_data = all_images[key] | |
| img = Image.open(io.BytesIO(img_data)) | |
| img.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) | |
| buffer = io.BytesIO() | |
| img.save(buffer, format='PNG') | |
| return base64.b64encode(buffer.getvalue()).decode() | |
| except: | |
| return None | |
| def update_selected_table(selected_ids, all_data, all_images, columns): | |
| """更新已选表格""" | |
| if not selected_ids or not all_data: | |
| return "<p style='text-align:center;color:#888;padding:30px;'>还没有选择款式</p>", 0 | |
| selected_items = [item for item in all_data if item['_unique_id'] in selected_ids] | |
| if not selected_items: | |
| return "<p style='text-align:center;color:#888;padding:30px;'>还没有选择款式</p>", 0 | |
| display_cols = [] | |
| skip_cols = ['款式图', '图片', '主图', '商品图'] | |
| for col in columns: | |
| if not col.startswith('_') and col not in skip_cols: | |
| display_cols.append(col) | |
| header_html = "<th style='padding:8px;text-align:left;border-bottom:2px solid #c7d2fe;background:#f5f3ff;'>序号</th>" | |
| header_html += "<th style='padding:8px;text-align:left;border-bottom:2px solid #c7d2fe;background:#f5f3ff;'>图片</th>" | |
| for col in display_cols: | |
| header_html += f"<th style='padding:8px;text-align:left;border-bottom:2px solid #c7d2fe;background:#f5f3ff;white-space:nowrap;'>{col}</th>" | |
| header_html += "<th style='padding:8px;text-align:left;border-bottom:2px solid #c7d2fe;background:#f5f3ff;'>来源</th>" | |
| rows_html = "" | |
| for i, item in enumerate(selected_items, 1): | |
| b64 = get_image_base64(all_images, item) | |
| if b64: | |
| img_html = f'<img src="data:image/png;base64,{b64}" style="width:60px;height:60px;object-fit:cover;border-radius:4px;">' | |
| else: | |
| img_html = '<div style="width:60px;height:60px;background:#f0f0f0;border-radius:4px;display:flex;align-items:center;justify-content:center;color:#aaa;font-size:10px;">无图</div>' | |
| row_html = f"<td style='padding:8px;border-bottom:1px solid #eee;'>{i}</td>" | |
| row_html += f"<td style='padding:8px;border-bottom:1px solid #eee;'>{img_html}</td>" | |
| for col in display_cols: | |
| val = item.get(col, '') or '' | |
| style = "padding:8px;border-bottom:1px solid #eee;" | |
| if '款号' in col or '货号' in col: | |
| style += "font-weight:600;color:#4f46e5;" | |
| elif '价' in col: | |
| style += "font-weight:600;color:#f59e0b;" | |
| elif '颜色' in col: | |
| style += "color:#10b981;" | |
| display_val = str(val)[:20] + '..' if len(str(val)) > 20 else val | |
| row_html += f"<td style='{style}'>{display_val}</td>" | |
| row_html += f"<td style='padding:8px;border-bottom:1px solid #eee;'><span style='background:#e0e7ff;color:#4338ca;padding:2px 6px;border-radius:3px;font-size:11px;'>{item['_sheet']}</span></td>" | |
| rows_html += f"<tr>{row_html}</tr>" | |
| html = f'''<div style="overflow-x:auto;max-height:500px;overflow-y:auto;"> | |
| <table style="width:100%;border-collapse:collapse;font-size:12px;min-width:600px;"> | |
| <thead><tr>{header_html}</tr></thead> | |
| <tbody>{rows_html}</tbody> | |
| </table></div>''' | |
| return html, len(selected_items) | |
| from collections import defaultdict | |
| def export_excel_with_images(selected_ids, customer_name, all_data, all_images, columns): | |
| """导出 Excel:按款号分组,多颜色 + 合并单元格版本""" | |
| if not selected_ids or not all_data: | |
| return None | |
| # 1. 找到所有被选中的记录 | |
| selected_items = [item for item in all_data if item['_unique_id'] in selected_ids] | |
| if not selected_items: | |
| return None | |
| # 2. 新建工作簿 | |
| wb = openpyxl.Workbook() | |
| ws = wb.active | |
| ws.title = "PAMIR直播产品详情" | |
| # 3. 按款号分组 | |
| sku_groups = defaultdict(list) | |
| for item in selected_items: | |
| sku_groups[item['_sku']].append(item) | |
| # 4. 写标题 + 表头 | |
| row = 1 | |
| # 标题行 | |
| ws.merge_cells(start_row=1, end_row=1, start_column=1, end_column=16) | |
| title_cell = ws.cell(row=row, column=1, value="PAMIR直播产品详情") | |
| title_cell.font = openpyxl.styles.Font(size=16, bold=True) | |
| title_cell.alignment = openpyxl.styles.Alignment(horizontal='center', vertical='center') | |
| row += 1 | |
| # 表头行 | |
| headers = ["序号", "款号", "图片", "材质成分", "大货尺码", | |
| "颜色", "XS", "S", "M", "L", "XL", "2XL", "3XL", "4XL", "F", "直播价"] | |
| for col_idx, header in enumerate(headers, 1): | |
| cell = ws.cell(row=row, column=col_idx, value=header) | |
| cell.font = openpyxl.styles.Font(bold=True) | |
| cell.alignment = openpyxl.styles.Alignment(horizontal='center', vertical='center') | |
| row += 1 | |
| # 列宽(可以按需要调整) | |
| ws.column_dimensions['A'].width = 6 | |
| ws.column_dimensions['B'].width = 10 | |
| ws.column_dimensions['C'].width = 12 | |
| ws.column_dimensions['D'].width = 18 | |
| ws.column_dimensions['E'].width = 10 | |
| ws.column_dimensions['F'].width = 10 | |
| for col_letter in ['G','H','I','J','K','L','M','N','O']: | |
| ws.column_dimensions[col_letter].width = 6 | |
| ws.column_dimensions['P'].width = 10 # 直播价 | |
| # 5. 写每个款号的数据 | |
| temp_files = [] | |
| seq = 1 | |
| for sku in sorted(sku_groups.keys()): | |
| items = sku_groups[sku] | |
| group_size = len(items) | |
| start_row = row | |
| end_row = row + group_size - 1 | |
| # 5.1 先写每一行的颜色 + 图片 | |
| for i, item in enumerate(items): | |
| r = row + i | |
| ws.row_dimensions[r].height = 80 | |
| # 颜色 | |
| color = "" | |
| for key in ["大货颜色", "颜色", "色号", "新色"]: | |
| if key in item and item[key]: | |
| color = str(item[key]).replace("\n", "/").strip() | |
| break | |
| ws.cell(row=r, column=6, value=color) | |
| # 图片 | |
| img_key = (item['_sheet'], item['_excel_row']) | |
| if img_key in all_images: | |
| try: | |
| img_data = all_images[img_key] | |
| img = Image.open(io.BytesIO(img_data)) | |
| max_size = 70 | |
| orig_w, orig_h = img.size | |
| ratio = min(max_size / orig_w, max_size / orig_h) | |
| new_w = int(orig_w * ratio) | |
| new_h = int(orig_h * ratio) | |
| img = img.resize((new_w, new_h), Image.Resampling.LANCZOS) | |
| tmp_path = os.path.join( | |
| tempfile.gettempdir(), | |
| f"img_{sku}_{i}_{datetime.now().timestamp()}.png" | |
| ) | |
| img.save(tmp_path, "PNG") | |
| temp_files.append(tmp_path) | |
| xl_img = XLImage(tmp_path) | |
| xl_img.width = new_w | |
| xl_img.height = new_h | |
| xl_img.anchor = f"C{r}" | |
| ws.add_image(xl_img) | |
| except Exception as e: | |
| print(f"图片处理失败: {e}") | |
| # 5.2 合并公共字段的单元格 | |
| merge_cols = [1, 2, 4, 5, 16] # 序号, 款号, 材质成分, 大货尺码, 直播价 | |
| if group_size > 1: | |
| for col in merge_cols: | |
| ws.merge_cells(start_row=start_row, end_row=end_row, | |
| start_column=col, end_column=col) | |
| # 5.3 在 start_row 写入公共字段 | |
| ws.cell(row=start_row, column=1, value=seq) | |
| ws.cell(row=start_row, column=2, value=sku) | |
| first = items[0] | |
| material = first.get("材质成分") or first.get("材质") or "" | |
| size_range = first.get("大货尺码") or first.get("尺码") or "" | |
| live_price = first.get("直播价") or "" | |
| ws.cell(row=start_row, column=4, value=material) | |
| ws.cell(row=start_row, column=5, value=size_range) | |
| ws.cell(row=start_row, column=16, value=live_price) | |
| seq += 1 | |
| row = end_row + 1 | |
| # 6. 保存文件 & 清理临时图片 | |
| today = datetime.now().strftime('%Y-%m-%d') | |
| customer = customer_name.strip() if customer_name else '客户' | |
| filename = f"{customer}的选款_{today}.xlsx" | |
| output_path = os.path.join(tempfile.gettempdir(), filename) | |
| wb.save(output_path) | |
| wb.close() | |
| for tmp_path in temp_files: | |
| try: | |
| os.unlink(tmp_path) | |
| except: | |
| pass | |
| return output_path | |
| # ========== Gradio 界面 ========== | |
| with gr.Blocks(title="款式选择器") as app: | |
| all_data_state = gr.State([]) | |
| all_images_state = gr.State({}) | |
| selected_ids_state = gr.State([]) | |
| columns_state = gr.State([]) | |
| gr.Markdown("""# 📦 款式选择器 v7 | |
| ✅ WPS嵌入图片 + 浮动图片 + 合并单元格(同款多颜色) + 动态字段 + 支持大文件 | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 📤 上传款式表") | |
| file_input = gr.File(label="选择Excel文件", file_types=[".xlsx", ".xls"]) | |
| load_status = gr.Textbox(label="状态", value="等待上传...", interactive=False, lines=3) | |
| gr.Markdown("### 🔍 搜索款号") | |
| search_input = gr.Textbox(label="输入款号关键字", placeholder="如:LRZ-010...") | |
| search_results = gr.CheckboxGroup(label="搜索结果", choices=[]) | |
| add_btn = gr.Button("➕ 添加选中项", variant="primary") | |
| with gr.Column(scale=2): | |
| gr.Markdown("### ✅ 已选款式") | |
| selected_count = gr.Number(label="已选数量", value=0, interactive=False) | |
| selected_table = gr.HTML("<p style='text-align:center;color:#888;padding:30px;'>还没有选择款式</p>") | |
| gr.Markdown("### 📥 导出") | |
| with gr.Row(): | |
| customer_input = gr.Textbox(label="客户名", placeholder="张三", scale=2) | |
| clear_btn = gr.Button("🗑️ 清空", scale=1) | |
| export_btn = gr.Button("📥 生成Excel(带图片)", variant="primary") | |
| export_file = gr.File(label="点击下载") | |
| gr.Markdown("### 📋 识别到的字段") | |
| columns_display = gr.Textbox(label="", interactive=False, value="上传文件后显示...") | |
| def on_file_upload(file, progress=gr.Progress()): | |
| status, data, images, cols = load_excel(file, progress) | |
| cols_text = ", ".join(cols) if cols else "无" | |
| return status, data, images, cols, [], gr.update(choices=[]), cols_text | |
| file_input.change( | |
| on_file_upload, | |
| [file_input], | |
| [load_status, all_data_state, all_images_state, columns_state, selected_ids_state, search_results, columns_display] | |
| ) | |
| def on_search(keyword, all_data): | |
| if not keyword or not all_data: | |
| return gr.update(choices=[], value=[]) | |
| matches = search_sku(keyword, all_data) | |
| return gr.update(choices=matches, value=[]) | |
| search_input.change(on_search, [search_input, all_data_state], [search_results]) | |
| def on_add(search_result, current_selected, all_data, all_images, columns): | |
| if not search_result: | |
| html, count = update_selected_table(current_selected, all_data, all_images, columns) | |
| return current_selected, html, count, gr.update(value=[]) | |
| new_selected = list(current_selected) if current_selected else [] | |
| for uid in search_result: | |
| # 处理"全部颜色"选项 | |
| if uid.startswith("__ALL__"): | |
| # 提取所有单独的ID | |
| all_ids = uid[7:].split("||") | |
| for single_id in all_ids: | |
| if single_id not in new_selected: | |
| new_selected.append(single_id) | |
| else: | |
| if uid not in new_selected: | |
| new_selected.append(uid) | |
| html, count = update_selected_table(new_selected, all_data, all_images, columns) | |
| return new_selected, html, count, gr.update(value=[]) | |
| add_btn.click( | |
| on_add, | |
| [search_results, selected_ids_state, all_data_state, all_images_state, columns_state], | |
| [selected_ids_state, selected_table, selected_count, search_results] | |
| ) | |
| def on_clear(all_data, all_images, columns): | |
| html, count = update_selected_table([], all_data, all_images, columns) | |
| return [], html, count | |
| clear_btn.click( | |
| on_clear, | |
| [all_data_state, all_images_state, columns_state], | |
| [selected_ids_state, selected_table, selected_count] | |
| ) | |
| def on_export(selected_ids, customer_name, all_data, all_images, columns): | |
| if not selected_ids: | |
| return None | |
| return export_excel_with_images(selected_ids, customer_name, all_data, all_images, columns) | |
| export_btn.click( | |
| on_export, | |
| [selected_ids_state, customer_input, all_data_state, all_images_state, columns_state], | |
| [export_file] | |
| ) | |
| app.launch() |