# -*- coding: utf-8 -*- """ 款式选择器 v7 - 支持WPS嵌入单元格图片 (DISPIMG) - 支持传统浮动图片 (Drawing) - 支持合并单元格(同款多颜色) - 动态列识别 """ import gradio as gr import pandas as pd import openpyxl from openpyxl.drawing.image import Image as XLImage from PIL import Image import io import base64 import os import tempfile from datetime import datetime import zipfile import re from xml.etree import ElementTree as ET def extract_floating_images(file_path): """提取传统浮动图片(Drawing方式)""" images = {} try: with zipfile.ZipFile(file_path, 'r') as zf: # 1. 读取媒体文件 media_files = {} for name in zf.namelist(): if name.startswith('xl/media/'): img_name = os.path.basename(name) media_files[img_name] = zf.read(name) if not media_files: return images # 2. 获取sheet名称 sheet_names = [] if 'xl/workbook.xml' in zf.namelist(): wb_xml = zf.read('xl/workbook.xml').decode('utf-8') for match in re.finditer(r']*name="([^"]*)"', wb_xml): sheet_names.append(match.group(1)) # 3. 遍历每个sheet的drawing关系 for sheet_idx in range(1, 20): sheet_rels_path = f'xl/worksheets/_rels/sheet{sheet_idx}.xml.rels' if sheet_rels_path not in zf.namelist(): continue sheet_name = sheet_names[sheet_idx-1] if sheet_idx <= len(sheet_names) else f'Sheet{sheet_idx}' # 找到关联的drawing文件 rels_xml = zf.read(sheet_rels_path).decode('utf-8') drawing_match = re.search(r'(drawing\d+)\.xml', rels_xml) if not drawing_match: continue drawing_name = drawing_match.group(1) drawing_path = f'xl/drawings/{drawing_name}.xml' drawing_rels_path = f'xl/drawings/_rels/{drawing_name}.xml.rels' if drawing_path not in zf.namelist(): continue # 4. 解析drawing的rels获取rId到图片的映射 rid_to_image = {} if drawing_rels_path in zf.namelist(): drels_xml = zf.read(drawing_rels_path).decode('utf-8') # 匹配Relationship标签,提取Id和Target(属性顺序不固定) for rel_tag in re.finditer(r']+/?>', drels_xml): tag_content = rel_tag.group(0) id_match = re.search(r'Id="(rId\d+)"', tag_content) target_match = re.search(r'Target="([^"]*)"', tag_content) if id_match and target_match: rid = id_match.group(1) target = target_match.group(1) if 'media/' in target: img_name = os.path.basename(target) if img_name in media_files: rid_to_image[rid] = media_files[img_name] # 5. 解析drawing.xml获取图片位置 drawing_xml = zf.read(drawing_path).decode('utf-8') # 匹配 oneCellAnchor 或 twoCellAnchor(支持带命名空间前缀如xdr:) # 1... anchor_pattern = r'<(?:\w+:)?(?:oneCellAnchor|twoCellAnchor)[^>]*>.*?<(?:\w+:)?from>.*?<(?:\w+:)?row>(\d+).*?.*?r:embed="(rId\d+)".*?' for anchor_match in re.finditer(anchor_pattern, drawing_xml, re.DOTALL): row = int(anchor_match.group(1)) + 1 # 0-indexed -> 1-indexed rid = anchor_match.group(2) if rid in rid_to_image: key = (sheet_name, row) if key not in images: images[key] = rid_to_image[rid] except Exception as e: print(f"提取浮动图片出错: {e}") import traceback traceback.print_exc() return images def extract_wps_cellimages(file_path): """提取WPS嵌入单元格的图片""" images = {} image_id_to_data = {} try: with zipfile.ZipFile(file_path, 'r') as zf: # 1. 读取媒体文件 media_files = {} for name in zf.namelist(): if name.startswith('xl/media/'): img_name = os.path.basename(name) media_files[img_name] = zf.read(name) # 2. 解析 cellimages.xml.rels rid_to_image = {} if 'xl/_rels/cellimages.xml.rels' in zf.namelist(): rels_xml = zf.read('xl/_rels/cellimages.xml.rels').decode('utf-8') rels_root = ET.fromstring(rels_xml) for rel in rels_root: rid = rel.get('Id') target = rel.get('Target') if rid and target and 'media/' in target: img_name = os.path.basename(target) if img_name in media_files: rid_to_image[rid] = media_files[img_name] # 3. 解析 cellimages.xml if 'xl/cellimages.xml' in zf.namelist(): cellimages_xml = zf.read('xl/cellimages.xml').decode('utf-8') namespaces = { 'etc': 'http://www.wps.cn/officeDocument/2017/etCustomData', 'xdr': 'http://schemas.openxmlformats.org/drawingml/2006/spreadsheetDrawing', 'a': 'http://schemas.openxmlformats.org/drawingml/2006/main', 'r': 'http://schemas.openxmlformats.org/officeDocument/2006/relationships' } root = ET.fromstring(cellimages_xml) for cellImage in root.findall('.//etc:cellImage', namespaces): pic = cellImage.find('.//xdr:pic', namespaces) if pic is not None: cNvPr = pic.find('.//xdr:cNvPr', namespaces) if cNvPr is not None: image_id = cNvPr.get('name') blip = pic.find('.//a:blip', namespaces) if blip is not None: rid = blip.get('{http://schemas.openxmlformats.org/officeDocument/2006/relationships}embed') if image_id and rid and rid in rid_to_image: image_id_to_data[image_id] = rid_to_image[rid] # 4. 获取sheet名称 sheet_names = [] if 'xl/workbook.xml' in zf.namelist(): wb_xml = zf.read('xl/workbook.xml').decode('utf-8') for match in re.finditer(r']*name="([^"]*)"', wb_xml): sheet_names.append(match.group(1)) # 5. 解析每个sheet的DISPIMG for sheet_idx in range(1, 20): sheet_path = f'xl/worksheets/sheet{sheet_idx}.xml' if sheet_path not in zf.namelist(): break sheet_name = sheet_names[sheet_idx-1] if sheet_idx <= len(sheet_names) else f'Sheet{sheet_idx}' sheet_xml = zf.read(sheet_path).decode('utf-8') # 修复:处理两种单元格格式 # 1. 自闭合: # 2. 正常闭合: ... # 使用更精确的正则匹配正常闭合的单元格(包含DISPIMG的必然是正常闭合的) cell_pattern = r']*>([^<]*(?:<(?!/c>)[^<]*)*)' for cell_match in re.finditer(cell_pattern, sheet_xml, re.DOTALL): cell_content = cell_match.group(0) if 'DISPIMG' in cell_content: col = cell_match.group(1) row = int(cell_match.group(2)) img_match = re.search(r'DISPIMG\("([^&]+)"', cell_content) if img_match: image_id = img_match.group(1) if image_id in image_id_to_data: key = (sheet_name, row) if key not in images: images[key] = image_id_to_data[image_id] except Exception as e: print(f"提取图片出错: {e}") import traceback traceback.print_exc() return images def parse_merged_cells(file_path): """解析合并单元格信息""" merged_ranges = {} # sheet_name -> [(start_row, end_row, start_col, end_col, value)] try: with zipfile.ZipFile(file_path, 'r') as zf: # 获取sheet名称 sheet_names = [] if 'xl/workbook.xml' in zf.namelist(): wb_xml = zf.read('xl/workbook.xml').decode('utf-8') for match in re.finditer(r']*name="([^"]*)"', wb_xml): sheet_names.append(match.group(1)) for sheet_idx in range(1, 20): sheet_path = f'xl/worksheets/sheet{sheet_idx}.xml' if sheet_path not in zf.namelist(): break sheet_name = sheet_names[sheet_idx-1] if sheet_idx <= len(sheet_names) else f'Sheet{sheet_idx}' sheet_xml = zf.read(sheet_path).decode('utf-8') # 找合并单元格 merged_ranges[sheet_name] = [] for match in re.finditer(r'0, B->1, ..., Z->25, AA->26...""" result = 0 for char in col: result = result * 26 + (ord(char) - ord('A') + 1) return result - 1 def find_header_row(df, max_rows=20): """查找表头行""" keywords = ['款号', '货号', 'SKU', '序号', '编号', '图片', '款式'] for idx in range(min(max_rows, len(df))): row = df.iloc[idx] for val in row.values: if pd.notna(val): val_str = str(val).strip() for kw in keywords: if kw in val_str: return idx return 0 def load_excel(file, progress=gr.Progress()): """加载Excel文件""" if file is None: return "请先上传Excel文件", [], {}, [] all_data = [] all_columns = [] column_set = set() try: file_path = file if isinstance(file, str) else file.name file_size = os.path.getsize(file_path) / (1024 * 1024) # 大文件提示,但不阻止上传 size_warning = "" if file_size > 500: size_warning = f"⚠️ 大文件 ({file_size:.0f}MB),处理中请耐心等待...\n" progress(0.05, desc=f"读取文件 ({file_size:.0f}MB)...") progress(0.1, desc="提取图片...") # 同时尝试两种图片格式 all_images = extract_wps_cellimages(file_path) # WPS嵌入式DISPIMG floating_images = extract_floating_images(file_path) # 传统浮动图片 # 合并(优先使用DISPIMG,如果没有再用浮动图片) for key, data in floating_images.items(): if key not in all_images: all_images[key] = data progress(0.2, desc="解析合并单元格...") merged_ranges = parse_merged_cells(file_path) progress(0.3, desc="读取数据...") xl = pd.ExcelFile(file_path) total_sheets = len(xl.sheet_names) for sheet_idx, sheet_name in enumerate(xl.sheet_names): progress(0.3 + 0.6 * (sheet_idx / total_sheets), desc=f"处理 {sheet_name}...") df = pd.read_excel(xl, sheet_name=sheet_name, header=None) if df.empty: continue header_row = find_header_row(df) raw_headers = df.iloc[header_row].tolist() # 列映射 col_mapping = {} for i, h in enumerate(raw_headers): if pd.notna(h): col_name = str(h).strip() if col_name and 'DISPIMG' not in col_name and col_name not in ['nan', 'None']: col_mapping[i] = col_name if col_name not in column_set: column_set.add(col_name) all_columns.append(col_name) # 找款号列 sku_col = None sku_keywords = ['款号', '货号', 'SKU', 'sku', '编号'] for i, name in col_mapping.items(): for kw in sku_keywords: if kw in name: sku_col = i break if sku_col is not None: break if sku_col is None: continue # 处理合并单元格 - 建立行->款号的映射 row_to_sku = {} sheet_merges = merged_ranges.get(sheet_name, []) # 先读取所有非空款号 for idx in range(header_row + 1, len(df)): row = df.iloc[idx] sku_val = row.iloc[sku_col] if sku_col < len(row) else None if pd.notna(sku_val): sku_str = str(sku_val).strip() if sku_str and 'DISPIMG' not in sku_str: excel_row = idx + 1 row_to_sku[excel_row] = sku_str # 处理合并单元格 - 填充空行的款号 for start_row, end_row, start_col, end_col in sheet_merges: # 检查是否是款号列的合并 sku_col_letter = openpyxl.utils.get_column_letter(sku_col + 1) if start_col <= sku_col_letter <= end_col: # 找到合并区域的款号值 merge_sku = row_to_sku.get(start_row) if merge_sku: # 填充到所有合并的行 for r in range(start_row, end_row + 1): if r not in row_to_sku: row_to_sku[r] = merge_sku # 读取所有数据行 for idx in range(header_row + 1, len(df)): excel_row = idx + 1 # 获取款号(优先从合并单元格映射) sku = row_to_sku.get(excel_row) if not sku: continue row = df.iloc[idx] has_image = (sheet_name, excel_row) in all_images unique_id = f"{sku}__{sheet_name}__{excel_row}" item = { '_sku': sku, '_sheet': sheet_name, '_excel_row': excel_row, '_has_image': has_image, '_unique_id': unique_id } # 读取所有列 for col_idx, col_name in col_mapping.items(): if col_idx < len(row): val = row.iloc[col_idx] if pd.notna(val): val_str = str(val).strip() if 'DISPIMG' not in val_str: item[col_name] = val_str else: item[col_name] = '' else: item[col_name] = '' else: item[col_name] = '' # 检查是否是空行(除了款号外其他都是空的) has_content = False for col_name in col_mapping.values(): if col_name not in ['款号', '货号', '编号', 'SKU']: if item.get(col_name, ''): has_content = True break if has_content or has_image: all_data.append(item) xl.close() # 确保所有item都有所有列 for item in all_data: for col in all_columns: if col not in item: item[col] = '' progress(1.0, desc="完成!") sheets_count = len(set(item['_sheet'] for item in all_data)) images_count = sum(1 for item in all_data if item['_has_image']) status = f"✅ 加载成功! {len(all_data)}条数据, {sheets_count}个Sheet, {images_count}张图片, {len(all_columns)}个字段" if size_warning: status = size_warning + status return status, all_data, all_images, all_columns except Exception as e: import traceback traceback.print_exc() return f"❌ 加载失败: {str(e)}", [], {}, [] def search_sku(keyword, all_data): """搜索款号 - 支持同款多颜色""" if not keyword or not all_data: return [] keyword = keyword.lower().strip() matches = [] # 先按款号分组 sku_groups = {} for item in all_data: if keyword in item['_sku'].lower(): sku = item['_sku'] if sku not in sku_groups: sku_groups[sku] = [] sku_groups[sku].append(item) # 为每个款号生成选项 for sku, items in sku_groups.items(): # 如果同款有多个颜色,添加"全部颜色"选项 if len(items) > 1: # 收集所有颜色 colors = [] for item in items: color = '' for key in ['大货颜色', '颜色', '色号', '新色']: if key in item and item[key]: color = item[key].replace('\n', '/').strip() break if color: colors.append(color) # 添加"全部颜色"选项 color_preview = ', '.join(colors[:3]) if len(colors) > 3: color_preview += f'...(共{len(colors)}色)' has_any_image = any(item['_has_image'] for item in items) all_label = f"🎨 {sku} - 全部颜色 ({color_preview}) [{items[0]['_sheet']}]" if has_any_image: all_label += " 📷" # 用特殊前缀标记"全部"选项 all_ids = "__ALL__" + "||".join(item['_unique_id'] for item in items) matches.append((all_label, all_ids)) # 添加每个单独颜色选项 for item in items: color = '' for key in ['大货颜色', '颜色', '色号', '新色']: if key in item and item[key]: color = item[key].replace('\n', '/').strip() break label = f"{item['_sku']}" if color: label += f" - {color}" label += f" [{item['_sheet']}]" if item['_has_image']: label += " 📷" matches.append((label, item['_unique_id'])) return matches def get_image_base64(all_images, item, max_size=80): """获取图片base64""" key = (item['_sheet'], item['_excel_row']) if key not in all_images: return None try: img_data = all_images[key] img = Image.open(io.BytesIO(img_data)) img.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) buffer = io.BytesIO() img.save(buffer, format='PNG') return base64.b64encode(buffer.getvalue()).decode() except: return None def update_selected_table(selected_ids, all_data, all_images, columns): """更新已选表格""" if not selected_ids or not all_data: return "

还没有选择款式

", 0 selected_items = [item for item in all_data if item['_unique_id'] in selected_ids] if not selected_items: return "

还没有选择款式

", 0 display_cols = [] skip_cols = ['款式图', '图片', '主图', '商品图'] for col in columns: if not col.startswith('_') and col not in skip_cols: display_cols.append(col) header_html = "序号" header_html += "图片" for col in display_cols: header_html += f"{col}" header_html += "来源" rows_html = "" for i, item in enumerate(selected_items, 1): b64 = get_image_base64(all_images, item) if b64: img_html = f'' else: img_html = '
无图
' row_html = f"{i}" row_html += f"{img_html}" for col in display_cols: val = item.get(col, '') or '' style = "padding:8px;border-bottom:1px solid #eee;" if '款号' in col or '货号' in col: style += "font-weight:600;color:#4f46e5;" elif '价' in col: style += "font-weight:600;color:#f59e0b;" elif '颜色' in col: style += "color:#10b981;" display_val = str(val)[:20] + '..' if len(str(val)) > 20 else val row_html += f"{display_val}" row_html += f"{item['_sheet']}" rows_html += f"{row_html}" html = f'''
{header_html}{rows_html}
''' return html, len(selected_items) from collections import defaultdict def export_excel_with_images(selected_ids, customer_name, all_data, all_images, columns): """导出 Excel:按款号分组,多颜色 + 合并单元格版本""" if not selected_ids or not all_data: return None # 1. 找到所有被选中的记录 selected_items = [item for item in all_data if item['_unique_id'] in selected_ids] if not selected_items: return None # 2. 新建工作簿 wb = openpyxl.Workbook() ws = wb.active ws.title = "PAMIR直播产品详情" # 3. 按款号分组 sku_groups = defaultdict(list) for item in selected_items: sku_groups[item['_sku']].append(item) # 4. 写标题 + 表头 row = 1 # 标题行 ws.merge_cells(start_row=1, end_row=1, start_column=1, end_column=16) title_cell = ws.cell(row=row, column=1, value="PAMIR直播产品详情") title_cell.font = openpyxl.styles.Font(size=16, bold=True) title_cell.alignment = openpyxl.styles.Alignment(horizontal='center', vertical='center') row += 1 # 表头行 headers = ["序号", "款号", "图片", "材质成分", "大货尺码", "颜色", "XS", "S", "M", "L", "XL", "2XL", "3XL", "4XL", "F", "直播价"] for col_idx, header in enumerate(headers, 1): cell = ws.cell(row=row, column=col_idx, value=header) cell.font = openpyxl.styles.Font(bold=True) cell.alignment = openpyxl.styles.Alignment(horizontal='center', vertical='center') row += 1 # 列宽(可以按需要调整) ws.column_dimensions['A'].width = 6 ws.column_dimensions['B'].width = 10 ws.column_dimensions['C'].width = 12 ws.column_dimensions['D'].width = 18 ws.column_dimensions['E'].width = 10 ws.column_dimensions['F'].width = 10 for col_letter in ['G','H','I','J','K','L','M','N','O']: ws.column_dimensions[col_letter].width = 6 ws.column_dimensions['P'].width = 10 # 直播价 # 5. 写每个款号的数据 temp_files = [] seq = 1 for sku in sorted(sku_groups.keys()): items = sku_groups[sku] group_size = len(items) start_row = row end_row = row + group_size - 1 # 5.1 先写每一行的颜色 + 图片 for i, item in enumerate(items): r = row + i ws.row_dimensions[r].height = 80 # 颜色 color = "" for key in ["大货颜色", "颜色", "色号", "新色"]: if key in item and item[key]: color = str(item[key]).replace("\n", "/").strip() break ws.cell(row=r, column=6, value=color) # 图片 img_key = (item['_sheet'], item['_excel_row']) if img_key in all_images: try: img_data = all_images[img_key] img = Image.open(io.BytesIO(img_data)) max_size = 70 orig_w, orig_h = img.size ratio = min(max_size / orig_w, max_size / orig_h) new_w = int(orig_w * ratio) new_h = int(orig_h * ratio) img = img.resize((new_w, new_h), Image.Resampling.LANCZOS) tmp_path = os.path.join( tempfile.gettempdir(), f"img_{sku}_{i}_{datetime.now().timestamp()}.png" ) img.save(tmp_path, "PNG") temp_files.append(tmp_path) xl_img = XLImage(tmp_path) xl_img.width = new_w xl_img.height = new_h xl_img.anchor = f"C{r}" ws.add_image(xl_img) except Exception as e: print(f"图片处理失败: {e}") # 5.2 合并公共字段的单元格 merge_cols = [1, 2, 4, 5, 16] # 序号, 款号, 材质成分, 大货尺码, 直播价 if group_size > 1: for col in merge_cols: ws.merge_cells(start_row=start_row, end_row=end_row, start_column=col, end_column=col) # 5.3 在 start_row 写入公共字段 ws.cell(row=start_row, column=1, value=seq) ws.cell(row=start_row, column=2, value=sku) first = items[0] material = first.get("材质成分") or first.get("材质") or "" size_range = first.get("大货尺码") or first.get("尺码") or "" live_price = first.get("直播价") or "" ws.cell(row=start_row, column=4, value=material) ws.cell(row=start_row, column=5, value=size_range) ws.cell(row=start_row, column=16, value=live_price) seq += 1 row = end_row + 1 # 6. 保存文件 & 清理临时图片 today = datetime.now().strftime('%Y-%m-%d') customer = customer_name.strip() if customer_name else '客户' filename = f"{customer}的选款_{today}.xlsx" output_path = os.path.join(tempfile.gettempdir(), filename) wb.save(output_path) wb.close() for tmp_path in temp_files: try: os.unlink(tmp_path) except: pass return output_path # ========== Gradio 界面 ========== with gr.Blocks(title="款式选择器") as app: all_data_state = gr.State([]) all_images_state = gr.State({}) selected_ids_state = gr.State([]) columns_state = gr.State([]) gr.Markdown("""# 📦 款式选择器 v7 ✅ WPS嵌入图片 + 浮动图片 + 合并单元格(同款多颜色) + 动态字段 + 支持大文件 """) with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 📤 上传款式表") file_input = gr.File(label="选择Excel文件", file_types=[".xlsx", ".xls"]) load_status = gr.Textbox(label="状态", value="等待上传...", interactive=False, lines=3) gr.Markdown("### 🔍 搜索款号") search_input = gr.Textbox(label="输入款号关键字", placeholder="如:LRZ-010...") search_results = gr.CheckboxGroup(label="搜索结果", choices=[]) add_btn = gr.Button("➕ 添加选中项", variant="primary") with gr.Column(scale=2): gr.Markdown("### ✅ 已选款式") selected_count = gr.Number(label="已选数量", value=0, interactive=False) selected_table = gr.HTML("

还没有选择款式

") gr.Markdown("### 📥 导出") with gr.Row(): customer_input = gr.Textbox(label="客户名", placeholder="张三", scale=2) clear_btn = gr.Button("🗑️ 清空", scale=1) export_btn = gr.Button("📥 生成Excel(带图片)", variant="primary") export_file = gr.File(label="点击下载") gr.Markdown("### 📋 识别到的字段") columns_display = gr.Textbox(label="", interactive=False, value="上传文件后显示...") def on_file_upload(file, progress=gr.Progress()): status, data, images, cols = load_excel(file, progress) cols_text = ", ".join(cols) if cols else "无" return status, data, images, cols, [], gr.update(choices=[]), cols_text file_input.change( on_file_upload, [file_input], [load_status, all_data_state, all_images_state, columns_state, selected_ids_state, search_results, columns_display] ) def on_search(keyword, all_data): if not keyword or not all_data: return gr.update(choices=[], value=[]) matches = search_sku(keyword, all_data) return gr.update(choices=matches, value=[]) search_input.change(on_search, [search_input, all_data_state], [search_results]) def on_add(search_result, current_selected, all_data, all_images, columns): if not search_result: html, count = update_selected_table(current_selected, all_data, all_images, columns) return current_selected, html, count, gr.update(value=[]) new_selected = list(current_selected) if current_selected else [] for uid in search_result: # 处理"全部颜色"选项 if uid.startswith("__ALL__"): # 提取所有单独的ID all_ids = uid[7:].split("||") for single_id in all_ids: if single_id not in new_selected: new_selected.append(single_id) else: if uid not in new_selected: new_selected.append(uid) html, count = update_selected_table(new_selected, all_data, all_images, columns) return new_selected, html, count, gr.update(value=[]) add_btn.click( on_add, [search_results, selected_ids_state, all_data_state, all_images_state, columns_state], [selected_ids_state, selected_table, selected_count, search_results] ) def on_clear(all_data, all_images, columns): html, count = update_selected_table([], all_data, all_images, columns) return [], html, count clear_btn.click( on_clear, [all_data_state, all_images_state, columns_state], [selected_ids_state, selected_table, selected_count] ) def on_export(selected_ids, customer_name, all_data, all_images, columns): if not selected_ids: return None return export_excel_with_images(selected_ids, customer_name, all_data, all_images, columns) export_btn.click( on_export, [selected_ids_state, customer_input, all_data_state, all_images_state, columns_state], [export_file] ) app.launch()