Spaces:
Running
Running
| import streamlit as st | |
| import sparknlp | |
| import os | |
| import pandas as pd | |
| import librosa | |
| from sparknlp.base import * | |
| from sparknlp.common import * | |
| from sparknlp.annotator import * | |
| from pyspark.ml import Pipeline | |
| from sparknlp.pretrained import PretrainedPipeline | |
| from pyspark.sql.types import * | |
| import pyspark.sql.functions as F | |
| # Page configuration | |
| st.set_page_config( | |
| layout="wide", | |
| initial_sidebar_state="auto" | |
| ) | |
| # Custom CSS for styling | |
| st.markdown(""" | |
| <style> | |
| .main-title { | |
| font-size: 36px; | |
| color: #4A90E2; | |
| font-weight: bold; | |
| text-align: center; | |
| } | |
| .section { | |
| background-color: #f9f9f9; | |
| padding: 10px; | |
| border-radius: 10px; | |
| margin-top: 10px; | |
| } | |
| .section p, .section ul { | |
| color: #666666; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| def init_spark(): | |
| """Initialize Spark NLP.""" | |
| return sparknlp.start() | |
| def create_pipeline(model): | |
| """Create a Spark NLP pipeline for audio processing.""" | |
| audioAssembler = AudioAssembler() \ | |
| .setInputCol("audio_content") \ | |
| .setOutputCol("audio_assembler") | |
| speechToText = WhisperForCTC.pretrained("asr_whisper_small_english","en") \ | |
| .setInputCols(["audio_assembler"]) \ | |
| .setOutputCol("text") | |
| pipeline = Pipeline(stages=[ | |
| audioAssembler, | |
| speechToText | |
| ]) | |
| return pipeline | |
| def fit_data(pipeline, fed_data): | |
| """Fit the data into the pipeline and return the transcription.""" | |
| data, sampling_rate = librosa.load(fed_data, sr=16000) | |
| data = data.tolist() | |
| spark_df = spark.createDataFrame([[data]], ["audio_content"]) | |
| model = pipeline.fit(spark_df) | |
| lp = LightPipeline(model) | |
| lp_result = lp.fullAnnotate(data)[0] | |
| return lp_result | |
| def save_uploadedfile(uploadedfile, path): | |
| """Save the uploaded file to the specified path.""" | |
| filepath = os.path.join(path, uploadedfile.name) | |
| with open(filepath, "wb") as f: | |
| if hasattr(uploadedfile, 'getbuffer'): | |
| f.write(uploadedfile.getbuffer()) | |
| else: | |
| f.write(uploadedfile.read()) | |
| # Sidebar content | |
| model_list = ["asr_whisper_small_english"] | |
| model = st.sidebar.selectbox( | |
| "Choose the pretrained model", | |
| model_list, | |
| help="For more info about the models visit: https://sparknlp.org/models" | |
| ) | |
| # Main content | |
| st.markdown('<div class="main-title">Speech Recognition With WhisperForCTC</div>', unsafe_allow_html=True) | |
| st.markdown('<div class="section"><p>This demo transcribes audio files into texts using the <code>WhisperForCTC</code> Annotator and advanced speech recognition models.</p></div>', unsafe_allow_html=True) | |
| # Reference notebook link in sidebar | |
| st.sidebar.markdown('Reference notebook:') | |
| st.sidebar.markdown(""" | |
| <a href="https://github.com/JohnSnowLabs/spark-nlp/blob/master/examples/python/annotation/audio/whisper/Automatic_Speech_Recognition_Whisper_(WhisperForCTC).ipynb"> | |
| <img src="https://colab.research.google.com/assets/colab-badge.svg" style="zoom: 1.3" alt="Open In Colab"/> | |
| </a> | |
| """, unsafe_allow_html=True) | |
| # Load examples | |
| AUDIO_FILE_PATH = "inputs" | |
| audio_files = sorted(os.listdir(AUDIO_FILE_PATH)) | |
| selected_audio = st.selectbox("Select an audio", audio_files) | |
| # Creating a simplified Python list of audio file types | |
| audio_file_types = ["mp3", "flac", "wav", "aac", "ogg", "aiff", "wma", "m4a", "ape", "dsf", "dff", "midi", "mid", "opus", "amr"] | |
| uploadedfile = st.file_uploader("Try it for yourself!", type=audio_file_types) | |
| if uploadedfile: | |
| selected_audio = f"{AUDIO_FILE_PATH}/{uploadedfile.name}" | |
| save_uploadedfile(uploadedfile, AUDIO_FILE_PATH) | |
| elif selected_audio: | |
| selected_audio = f"{AUDIO_FILE_PATH}/{selected_audio}" | |
| # Audio playback and transcription | |
| st.subheader("Play Audio") | |
| with open(selected_audio, 'rb') as audio_file: | |
| audio_bytes = audio_file.read() | |
| st.audio(audio_bytes) | |
| spark = init_spark() | |
| pipeline = create_pipeline(model) | |
| output = fit_data(pipeline, selected_audio) | |
| st.subheader(f"Transcription:") | |
| st.markdown(f"{(output['text'][0].result).title()}") |