|
|
import io
|
|
|
import logging
|
|
|
import wave
|
|
|
from pathlib import Path
|
|
|
import struct
|
|
|
|
|
|
from flask import Flask, Response, jsonify, render_template, request, send_file, stream_with_context
|
|
|
from flask_cors import CORS
|
|
|
from piper import PiperVoice
|
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
app = Flask(__name__)
|
|
|
CORS(app)
|
|
|
|
|
|
|
|
|
tts_instances = {}
|
|
|
|
|
|
|
|
|
VOICES_DIR = Path(__file__).parent / "voices"
|
|
|
|
|
|
def get_tts_instance(voice):
|
|
|
"""
|
|
|
Retrieves a cached PiperVoice instance or creates a new one.
|
|
|
Loads the model and its required .onnx.json config file.
|
|
|
"""
|
|
|
if voice not in tts_instances:
|
|
|
logger.info(f"Creating new PiperVoice instance for voice: {voice}")
|
|
|
try:
|
|
|
model_path, config_path = None, None
|
|
|
possible_paths = [
|
|
|
VOICES_DIR / f"{voice}.onnx",
|
|
|
Path(__file__).parent / f"{voice}.onnx",
|
|
|
Path(f"{voice}.onnx"),
|
|
|
]
|
|
|
for path in possible_paths:
|
|
|
if path.exists():
|
|
|
model_path = str(path)
|
|
|
potential_config_path = path.with_suffix(".onnx.json")
|
|
|
if potential_config_path.exists():
|
|
|
config_path = str(potential_config_path)
|
|
|
logger.info(f"Found model at: {model_path}")
|
|
|
logger.info(f"Found config at: {config_path}")
|
|
|
break
|
|
|
|
|
|
if not model_path or not config_path:
|
|
|
logger.error(f"Voice model or config not found for '{voice}'. Ensure both '.onnx' and '.onnx.json' are present.")
|
|
|
return None
|
|
|
|
|
|
tts_instances[voice] = PiperVoice.load(model_path, config_path=config_path)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to create PiperVoice instance for voice {voice}: {e}", exc_info=True)
|
|
|
return None
|
|
|
return tts_instances[voice]
|
|
|
|
|
|
@app.route('/')
|
|
|
def index():
|
|
|
"""Serves the index.html frontend."""
|
|
|
return render_template('index.html')
|
|
|
|
|
|
@app.route('/api/tts', methods=['GET'])
|
|
|
def synthesize_audio_full():
|
|
|
"""
|
|
|
Generates the full audio file and returns it.
|
|
|
"""
|
|
|
text = request.args.get('text')
|
|
|
voice = request.args.get('voice', 'en_GB-alba-medium')
|
|
|
|
|
|
if not text:
|
|
|
return jsonify({"error": "Text to synthesize is required."}), 400
|
|
|
|
|
|
tts_instance = get_tts_instance(voice)
|
|
|
if not tts_instance:
|
|
|
return jsonify({"error": f"Could not load voice model for '{voice}'."}), 500
|
|
|
|
|
|
try:
|
|
|
wav_io = io.BytesIO()
|
|
|
with wave.open(wav_io, 'wb') as wav_file:
|
|
|
wav_file.setnchannels(1)
|
|
|
wav_file.setsampwidth(2)
|
|
|
wav_file.setframerate(tts_instance.config.sample_rate)
|
|
|
|
|
|
|
|
|
|
|
|
for audio_chunk in tts_instance.synthesize(text):
|
|
|
wav_file.writeframes(audio_chunk.audio_int16_bytes)
|
|
|
|
|
|
wav_io.seek(0)
|
|
|
|
|
|
return send_file(
|
|
|
wav_io,
|
|
|
mimetype='audio/wav',
|
|
|
as_attachment=True,
|
|
|
download_name='output.wav'
|
|
|
)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error during full synthesis: {e}", exc_info=True)
|
|
|
return jsonify({"error": f"Failed to synthesize audio: {str(e)}"}), 500
|
|
|
|
|
|
def generate_audio_stream(tts_instance, text):
|
|
|
"""A generator function that streams the synthesized audio."""
|
|
|
try:
|
|
|
|
|
|
def create_wav_header(sample_rate, bits_per_sample=16, channels=1):
|
|
|
datasize = 2**32 - 1
|
|
|
o = [b'RIFF', struct.pack('<I', datasize + 36), b'WAVE', b'fmt ',
|
|
|
struct.pack('<I', 16), struct.pack('<H', 1), struct.pack('<H', channels),
|
|
|
struct.pack('<I', sample_rate),
|
|
|
struct.pack('<I', sample_rate * channels * bits_per_sample // 8),
|
|
|
struct.pack('<H', channels * bits_per_sample // 8),
|
|
|
struct.pack('<H', bits_per_sample), b'data', struct.pack('<I', datasize)]
|
|
|
return b"".join(o)
|
|
|
|
|
|
header = create_wav_header(tts_instance.config.sample_rate)
|
|
|
yield header
|
|
|
|
|
|
|
|
|
|
|
|
for audio_chunk in tts_instance.synthesize(text):
|
|
|
yield audio_chunk.audio_int16_bytes
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error during stream generation: {e}", exc_info=True)
|
|
|
|
|
|
|
|
|
@app.route('/api/tts-stream', methods=['GET'])
|
|
|
def synthesize_audio_stream():
|
|
|
"""
|
|
|
Streams the synthesized audio back to the client as it's generated.
|
|
|
"""
|
|
|
text = request.args.get('text')
|
|
|
voice = request.args.get('voice', 'en_GB-alba-medium')
|
|
|
|
|
|
if not text:
|
|
|
return jsonify({"error": "Text to synthesize is required."}), 400
|
|
|
|
|
|
tts_instance = get_tts_instance(voice)
|
|
|
if not tts_instance:
|
|
|
return jsonify({"error": f"Could not load voice model for '{voice}'."}), 500
|
|
|
|
|
|
stream_generator = generate_audio_stream(tts_instance, text)
|
|
|
return Response(stream_with_context(stream_generator), mimetype='audio/wav')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
app.run(debug=True, port=5001) |