| | from flask import Flask, render_template, request, jsonify, send_file, Response, stream_with_context
|
| | from werkzeug.utils import secure_filename
|
| | import os
|
| | from pathlib import Path
|
| | import shutil
|
| | import io
|
| | import zipfile
|
| | import base64
|
| | import json
|
| | from pipeline import classify
|
| |
|
| | app = Flask(__name__)
|
| |
|
| |
|
| | UPLOAD_FOLDER_SINGLE = 'static/uploads/single'
|
| | UPLOAD_FOLDER_MULTIPLE = 'static/uploads/multiple'
|
| | ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
|
| |
|
| | app.config['UPLOAD_FOLDER_SINGLE'] = UPLOAD_FOLDER_SINGLE
|
| | app.config['UPLOAD_FOLDER_MULTIPLE'] = UPLOAD_FOLDER_MULTIPLE
|
| | app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024
|
| | app.config['MAX_CONTENT_LENGTH_REPORT'] = 500 * 1024 * 1024
|
| |
|
| |
|
| | os.makedirs(UPLOAD_FOLDER_SINGLE, exist_ok=True)
|
| | os.makedirs(UPLOAD_FOLDER_MULTIPLE, exist_ok=True)
|
| |
|
| | def allowed_file(filename):
|
| | return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
|
| |
|
| | def clear_uploads(folder):
|
| | upload_dir = Path(folder)
|
| | if upload_dir.exists():
|
| | shutil.rmtree(upload_dir)
|
| | upload_dir.mkdir(parents=True)
|
| |
|
| |
|
| | @app.route('/')
|
| | def index():
|
| | return render_template('index.html')
|
| |
|
| | @app.route('/single')
|
| | def single():
|
| | return render_template('single.html')
|
| |
|
| | @app.route('/multiple')
|
| | def multiple():
|
| | return render_template('multiple.html')
|
| |
|
| | @app.route('/clear_uploads', methods=['POST'])
|
| | def clear_uploads_route():
|
| | try:
|
| | clear_uploads(app.config['UPLOAD_FOLDER_SINGLE'])
|
| | clear_uploads(app.config['UPLOAD_FOLDER_MULTIPLE'])
|
| | return jsonify({'message': 'Upload folders cleared successfully'})
|
| | except Exception as e:
|
| | return jsonify({'error': str(e)}), 500
|
| |
|
| |
|
| | @app.route('/upload_single', methods=['POST'])
|
| | def upload_single():
|
| | if 'file' not in request.files:
|
| | return jsonify({'error': 'No file part'}), 400
|
| |
|
| | file = request.files['file']
|
| | if file.filename == '':
|
| | return jsonify({'error': 'No selected file'}), 400
|
| |
|
| | if file and allowed_file(file.filename):
|
| |
|
| | clear_uploads(app.config['UPLOAD_FOLDER_SINGLE'])
|
| |
|
| |
|
| | filename = secure_filename(file.filename)
|
| | filepath = os.path.join(app.config['UPLOAD_FOLDER_SINGLE'], filename)
|
| | file.save(filepath)
|
| |
|
| | return jsonify({
|
| | 'message': 'File uploaded successfully',
|
| | 'filename': filename
|
| | })
|
| |
|
| | return jsonify({'error': 'Invalid file type'}), 400
|
| |
|
| | @app.route('/classify_single', methods=['POST'])
|
| | def classify_single():
|
| | data = request.get_json()
|
| | filename = data.get('filename')
|
| | if not filename:
|
| | return jsonify({'error': 'No filename provided'}), 400
|
| |
|
| | filepath = os.path.join(app.config['UPLOAD_FOLDER_SINGLE'], filename)
|
| | if not os.path.exists(filepath):
|
| | return jsonify({'error': 'File not found'}), 404
|
| |
|
| | try:
|
| | classification_result, result_table, failure_labels = classify(filepath)
|
| | return jsonify({
|
| | 'classification': classification_result,
|
| | 'result_table': result_table
|
| | })
|
| | except Exception as e:
|
| | return jsonify({'error': str(e)}), 500
|
| |
|
| |
|
| | @app.route('/upload_multiple', methods=['POST'])
|
| | def upload_multiple():
|
| | if 'file' not in request.files:
|
| | return jsonify({'error': 'No file uploaded'}), 400
|
| |
|
| | file = request.files['file']
|
| | if not file or not allowed_file(file.filename):
|
| | return jsonify({'error': 'Invalid file type'}), 400
|
| |
|
| | try:
|
| |
|
| | temp_dir = os.path.join(app.config['UPLOAD_FOLDER_MULTIPLE'], 'temp')
|
| | os.makedirs(temp_dir, exist_ok=True)
|
| |
|
| | filename = secure_filename(file.filename)
|
| | filepath = os.path.join(temp_dir, filename)
|
| | file.save(filepath)
|
| |
|
| |
|
| | with open(filepath, 'rb') as img_file:
|
| | img_data = base64.b64encode(img_file.read()).decode()
|
| |
|
| | return jsonify({
|
| | 'filename': filename,
|
| | 'image': img_data,
|
| | 'status': 'Queued'
|
| | })
|
| |
|
| | except Exception as e:
|
| | return jsonify({'error': str(e)}), 500
|
| |
|
| | @app.route('/classify_multiple', methods=['POST'])
|
| | def classify_multiple():
|
| | temp_dir = os.path.join(app.config['UPLOAD_FOLDER_MULTIPLE'], 'temp')
|
| |
|
| |
|
| | files = [f for f in os.listdir(temp_dir)
|
| | if os.path.isfile(os.path.join(temp_dir, f)) and allowed_file(f)]
|
| |
|
| | results = []
|
| | for filename in files:
|
| | filepath = os.path.join(temp_dir, filename)
|
| |
|
| |
|
| | classification_result, result_table, failure_labels = classify(filepath)
|
| |
|
| |
|
| | category_dir = os.path.join(app.config['UPLOAD_FOLDER_MULTIPLE'], classification_result.lower())
|
| | os.makedirs(category_dir, exist_ok=True)
|
| | dest_path = os.path.join(category_dir, filename)
|
| | shutil.move(filepath, dest_path)
|
| |
|
| |
|
| | with open(dest_path, 'rb') as img_file:
|
| | img_data = base64.b64encode(img_file.read()).decode()
|
| |
|
| |
|
| | result = {
|
| | 'filename': filename,
|
| | 'status': classification_result,
|
| | 'labels': failure_labels,
|
| | 'image': img_data
|
| | }
|
| |
|
| |
|
| | return jsonify(result)
|
| |
|
| | return jsonify({'message': 'All files processed'})
|
| |
|
| | @app.route('/download_multiple/<category>')
|
| | def download_multiple(category):
|
| | if category not in ['pass', 'fail']:
|
| | return jsonify({'error': 'Invalid category'}), 400
|
| |
|
| | category_dir = os.path.join(app.config['UPLOAD_FOLDER_MULTIPLE'], category)
|
| | if not os.path.exists(category_dir):
|
| | return jsonify({'error': 'No images found'}), 404
|
| |
|
| | memory_file = io.BytesIO()
|
| | with zipfile.ZipFile(memory_file, 'w') as zf:
|
| | for filename in os.listdir(category_dir):
|
| | filepath = os.path.join(category_dir, filename)
|
| | if os.path.isfile(filepath) and allowed_file(filename):
|
| | zf.write(filepath, filename)
|
| |
|
| | memory_file.seek(0)
|
| | return send_file(
|
| | memory_file,
|
| | mimetype='application/zip',
|
| | as_attachment=True,
|
| | download_name=f'{category}_images.zip'
|
| | )
|
| |
|
| | @app.route('/save_report', methods=['POST'])
|
| | def save_report():
|
| | try:
|
| | data = request.get_json()
|
| | date = data.get('date')
|
| | time = data.get('time')
|
| | summary_content = data.get('summary')
|
| | table_data = data.get('tableData', [])
|
| |
|
| | if not all([date, time, summary_content, table_data]):
|
| | return jsonify({'error': 'Invalid data provided'}), 400
|
| |
|
| |
|
| | table_rows = []
|
| | for item in table_data:
|
| | labels_html = ''
|
| | if item['status'].lower() == 'fail' and item['labels']:
|
| | labels = ' '.join(f'<span class="label">{label}</span>' for label in item['labels'])
|
| | labels_html = f'<div class="labels">{labels}</div>'
|
| |
|
| | row = f"""
|
| | <tr>
|
| | <td class="serial">{item['serialNo']}</td>
|
| | <td class="image-col">
|
| | <img src="{item['image']}" alt="{item['filename']}"
|
| | class="thumbnail" onclick="openModal(this)"
|
| | style="cursor: pointer;">
|
| | <div class="filename">{item['filename']}</div>
|
| | </td>
|
| | <td class="result-col">
|
| | <span class="result-{item['status'].lower()}">{item['status']}</span>
|
| | </td>
|
| | <td class="labels-col">
|
| | {labels_html if labels_html else '-'}
|
| | </td>
|
| | </tr>
|
| | """
|
| | table_rows.append(row)
|
| |
|
| |
|
| | reports_dir = Path('Reports')
|
| | reports_dir.mkdir(exist_ok=True)
|
| | date_dir = reports_dir / date
|
| | date_dir.mkdir(exist_ok=True)
|
| |
|
| |
|
| | template_path = Path('templates/report_template.html')
|
| | with open(template_path, 'r', encoding='utf-8') as f:
|
| | template = f.read()
|
| |
|
| |
|
| | html_content = template.format(
|
| | date=date,
|
| | time=time,
|
| | summary_content=summary_content,
|
| | table_rows='\n'.join(table_rows)
|
| | )
|
| |
|
| |
|
| | filename = f'Report_{date}_{time}.html'
|
| | report_path = date_dir / filename
|
| | with open(report_path, 'w', encoding='utf-8') as f:
|
| | f.write(html_content)
|
| |
|
| | return jsonify({'success': True})
|
| | except Exception as e:
|
| | return jsonify({'error': str(e)}), 500
|
| |
|
| |
|
| | @app.route('/save_report_chunk', methods=['POST'])
|
| | def save_report_chunk():
|
| | try:
|
| | chunk_data = request.get_json()
|
| | if not chunk_data:
|
| | return jsonify({'error': 'No data received'}), 400
|
| |
|
| |
|
| | print(f"Received chunk data: {chunk_data.keys()}")
|
| |
|
| |
|
| | required_fields = ['chunkNumber', 'totalChunks', 'date', 'time', 'chunk']
|
| | if not all(field in chunk_data for field in required_fields):
|
| | missing_fields = [field for field in required_fields if field not in chunk_data]
|
| | return jsonify({'error': f'Missing required fields: {", ".join(missing_fields)}'}), 400
|
| |
|
| |
|
| | try:
|
| | chunk_number = int(chunk_data['chunkNumber'])
|
| | total_chunks = int(chunk_data['totalChunks'])
|
| | date = str(chunk_data['date'])
|
| | time = str(chunk_data['time'])
|
| | chunk = chunk_data['chunk']
|
| | summary = chunk_data.get('summary', '')
|
| | except (ValueError, TypeError) as e:
|
| | return jsonify({'error': f'Invalid data format: {str(e)}'}), 400
|
| |
|
| |
|
| | if not isinstance(chunk, list):
|
| | return jsonify({'error': 'Chunk must be an array'}), 400
|
| |
|
| |
|
| | try:
|
| | reports_dir = Path('Reports')
|
| | reports_dir.mkdir(exist_ok=True)
|
| | date_dir = reports_dir / date
|
| | date_dir.mkdir(exist_ok=True)
|
| | temp_dir = date_dir / 'temp'
|
| | temp_dir.mkdir(exist_ok=True)
|
| | except Exception as e:
|
| | return jsonify({'error': f'Failed to create directories: {str(e)}'}), 500
|
| |
|
| |
|
| | try:
|
| | chunk_file = temp_dir / f'chunk_{chunk_number}.json'
|
| | chunk_data_to_save = {
|
| | 'data': chunk,
|
| | 'summary': summary if chunk_number == 0 else ''
|
| | }
|
| | with open(chunk_file, 'w', encoding='utf-8') as f:
|
| | json.dump(chunk_data_to_save, f)
|
| | except Exception as e:
|
| | return jsonify({'error': f'Failed to save chunk: {str(e)}'}), 500
|
| |
|
| |
|
| | if chunk_number == total_chunks - 1:
|
| | try:
|
| |
|
| | all_data = []
|
| | summary_content = ''
|
| |
|
| |
|
| | for i in range(total_chunks):
|
| | chunk_file = temp_dir / f'chunk_{i}.json'
|
| | if not chunk_file.exists():
|
| | return jsonify({'error': f'Missing chunk file: chunk_{i}.json'}), 500
|
| |
|
| | with open(chunk_file, 'r', encoding='utf-8') as f:
|
| | chunk_content = json.load(f)
|
| | all_data.extend(chunk_content['data'])
|
| | if chunk_content['summary']:
|
| | summary_content = chunk_content['summary']
|
| |
|
| |
|
| | template_path = Path('templates/report_template.html')
|
| | multiple_path = Path('templates/multiple.html')
|
| |
|
| | if not template_path.exists() or not multiple_path.exists():
|
| | return jsonify({'error': 'Template files not found'}), 500
|
| |
|
| |
|
| | with open(multiple_path, 'r', encoding='utf-8') as f:
|
| | template = f.read()
|
| |
|
| | upload_start = template.find('<div class="upload-section">')
|
| | upload_end = template.find('</div>', upload_start) + 6
|
| | template = template.replace(template[upload_start:upload_end], '')
|
| |
|
| | template = template.replace('<h1>Multiple Images Classification</h1>', '<h1>Classification Report</h1>')
|
| | template = template.replace('<p>Classify and segregate multiple images at once...</p>', '<p>{date} {time}</p>')
|
| | template = template.replace('<div id="summary"></div>', '<div id="summary">{summary_content}</div>')
|
| | template = template.replace('<tbody id="results-tbody">', '<tbody>{table_rows}</tbody>')
|
| |
|
| |
|
| | with open(multiple_path, 'r', encoding='utf-8') as f:
|
| | multiple_content = f.read()
|
| | style_start = multiple_content.find('<style>') + 7
|
| | style_end = multiple_content.find('</style>')
|
| | styles = multiple_content[style_start:style_end].strip()
|
| |
|
| |
|
| | with open(template_path, 'r', encoding='utf-8') as f:
|
| | template = f.read()
|
| |
|
| | template = template.replace('{multiple_styles}', styles)
|
| |
|
| | template = template.replace('{', '{{').replace('}', '}}')
|
| |
|
| | template = template.replace('{{date}}', '{date}') \
|
| | .replace('{{time}}', '{time}') \
|
| | .replace('{{summary_content}}', '{summary_content}') \
|
| | .replace('{{table_rows}}', '{table_rows}')
|
| |
|
| |
|
| | table_rows = []
|
| | serial_counter = 1
|
| |
|
| | for item in all_data:
|
| | try:
|
| | if item['status'].lower() == 'fail':
|
| | labels_html = ''
|
| | if item['labels']:
|
| | labels = ' '.join(f'<span class="label">{label}</span>'
|
| | for label in item['labels'])
|
| | labels_html = f'<div class="labels">{labels}</div>'
|
| |
|
| | row = f"""
|
| | <tr>
|
| | <td class="serial">{serial_counter}</td>
|
| | <td class="image-col">
|
| | <img src="{item['image']}" alt="{item['filename']}"
|
| | class="thumbnail" onclick="openModal(this)"
|
| | style="cursor: pointer;">
|
| | <div class="filename">{item['filename']}</div>
|
| | </td>
|
| | <td class="result-col">
|
| | <span class="result-fail">{item['status']}</span>
|
| | </td>
|
| | <td class="labels-col">
|
| | {labels_html if labels_html else '-'}
|
| | </td>
|
| | </tr>
|
| | """
|
| | table_rows.append(row)
|
| | serial_counter += 1
|
| |
|
| | except KeyError as e:
|
| | return jsonify({'error': f'Missing required field in data: {str(e)}'}), 500
|
| |
|
| |
|
| | try:
|
| | html_content = template.format(
|
| | date=date,
|
| | time=time,
|
| | summary_content=summary_content,
|
| | table_rows='\n'.join(table_rows)
|
| | )
|
| | except Exception as e:
|
| | return jsonify({'error': f'Failed to generate HTML: {str(e)}'}), 500
|
| |
|
| |
|
| | try:
|
| | filename = f'Report_{date}_{time}.html'
|
| | report_path = date_dir / filename
|
| | with open(report_path, 'w', encoding='utf-8') as f:
|
| | f.write(html_content)
|
| | except Exception as e:
|
| | return jsonify({'error': f'Failed to save report: {str(e)}'}), 500
|
| |
|
| |
|
| | try:
|
| | shutil.rmtree(temp_dir)
|
| | except Exception as e:
|
| | print(f"Warning: Failed to clean up temp directory: {str(e)}")
|
| |
|
| | return jsonify({'success': True})
|
| |
|
| | except Exception as e:
|
| | return jsonify({'error': f'Error processing final chunk: {str(e)}'}), 500
|
| |
|
| | return jsonify({'success': True, 'message': 'Chunk received'})
|
| |
|
| | except Exception as e:
|
| | print(f"Error in save_report_chunk: {str(e)}")
|
| | return jsonify({'error': str(e)}), 500
|
| |
|
| | if __name__ == '__main__':
|
| |
|
| | clear_uploads(app.config['UPLOAD_FOLDER_SINGLE'])
|
| | clear_uploads(app.config['UPLOAD_FOLDER_MULTIPLE'])
|
| | app.run(host='0.0.0.0', port=7860, debug=False) |