File size: 5,701 Bytes
c0b6368 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
import io
import logging
import wave
from pathlib import Path
import struct
from flask import Flask, Response, jsonify, render_template, request, send_file, stream_with_context
from flask_cors import CORS
from piper import PiperVoice
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
CORS(app) # This will enable CORS for all routes.
# In-memory cache for PiperVoice instances
tts_instances = {}
# Directory where voice models are stored
VOICES_DIR = Path(__file__).parent / "voices"
def get_tts_instance(voice):
"""
Retrieves a cached PiperVoice instance or creates a new one.
Loads the model and its required .onnx.json config file.
"""
if voice not in tts_instances:
logger.info(f"Creating new PiperVoice instance for voice: {voice}")
try:
model_path, config_path = None, None
possible_paths = [
VOICES_DIR / f"{voice}.onnx",
Path(__file__).parent / f"{voice}.onnx",
Path(f"{voice}.onnx"),
]
for path in possible_paths:
if path.exists():
model_path = str(path)
potential_config_path = path.with_suffix(".onnx.json")
if potential_config_path.exists():
config_path = str(potential_config_path)
logger.info(f"Found model at: {model_path}")
logger.info(f"Found config at: {config_path}")
break
if not model_path or not config_path:
logger.error(f"Voice model or config not found for '{voice}'. Ensure both '.onnx' and '.onnx.json' are present.")
return None
tts_instances[voice] = PiperVoice.load(model_path, config_path=config_path)
except Exception as e:
logger.error(f"Failed to create PiperVoice instance for voice {voice}: {e}", exc_info=True)
return None
return tts_instances[voice]
@app.route('/')
def index():
"""Serves the index.html frontend."""
return render_template('index.html')
@app.route('/api/tts', methods=['GET'])
def synthesize_audio_full():
"""
Generates the full audio file and returns it.
"""
text = request.args.get('text')
voice = request.args.get('voice', 'en_GB-alba-medium')
if not text:
return jsonify({"error": "Text to synthesize is required."}), 400
tts_instance = get_tts_instance(voice)
if not tts_instance:
return jsonify({"error": f"Could not load voice model for '{voice}'."}), 500
try:
wav_io = io.BytesIO()
with wave.open(wav_io, 'wb') as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2) # 16-bit
wav_file.setframerate(tts_instance.config.sample_rate)
# THE CORRECT FIX, BASED ON YOUR PROVIDED `piper_tts.py`
# The AudioChunk object has a specific attribute for the raw bytes.
for audio_chunk in tts_instance.synthesize(text):
wav_file.writeframes(audio_chunk.audio_int16_bytes)
wav_io.seek(0)
return send_file(
wav_io,
mimetype='audio/wav',
as_attachment=True,
download_name='output.wav'
)
except Exception as e:
logger.error(f"Error during full synthesis: {e}", exc_info=True)
return jsonify({"error": f"Failed to synthesize audio: {str(e)}"}), 500
def generate_audio_stream(tts_instance, text):
"""A generator function that streams the synthesized audio."""
try:
# 1. Create and yield the WAV header.
def create_wav_header(sample_rate, bits_per_sample=16, channels=1):
datasize = 2**32 - 1 # Use max value for streaming
o = [b'RIFF', struct.pack('<I', datasize + 36), b'WAVE', b'fmt ',
struct.pack('<I', 16), struct.pack('<H', 1), struct.pack('<H', channels),
struct.pack('<I', sample_rate),
struct.pack('<I', sample_rate * channels * bits_per_sample // 8),
struct.pack('<H', channels * bits_per_sample // 8),
struct.pack('<H', bits_per_sample), b'data', struct.pack('<I', datasize)]
return b"".join(o)
header = create_wav_header(tts_instance.config.sample_rate)
yield header
# 2. THE CORRECT FIX, APPLIED TO STREAMING
# Yield the raw bytes from the .audio_int16_bytes attribute.
for audio_chunk in tts_instance.synthesize(text):
yield audio_chunk.audio_int16_bytes
except Exception as e:
logger.error(f"Error during stream generation: {e}", exc_info=True)
@app.route('/api/tts-stream', methods=['GET'])
def synthesize_audio_stream():
"""
Streams the synthesized audio back to the client as it's generated.
"""
text = request.args.get('text')
voice = request.args.get('voice', 'en_GB-alba-medium')
if not text:
return jsonify({"error": "Text to synthesize is required."}), 400
tts_instance = get_tts_instance(voice)
if not tts_instance:
return jsonify({"error": f"Could not load voice model for '{voice}'."}), 500
stream_generator = generate_audio_stream(tts_instance, text)
return Response(stream_with_context(stream_generator), mimetype='audio/wav')
if __name__ == '__main__':
app.run(debug=True, port=5001) |