#!/usr/bin/env python # Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """Google Cloud Text-To-Speech API streaming sample with input/output streams.""" from google.cloud import texttospeech import itertools import queue import threading class TTSStreamer: def __init__(self): self.client = texttospeech.TextToSpeechClient() self.text_queue = queue.Queue() self.audio_queue = queue.Queue() def start_stream(self): streaming_config = texttospeech.StreamingSynthesizeConfig( voice=texttospeech.VoiceSelectionParams( name="en-US-Journey-D", language_code="en-US" ) ) config_request = texttospeech.StreamingSynthesizeRequest( streaming_config=streaming_config ) def request_generator(): while True: try: text = self.text_queue.get() if text is None: # Poison pill to stop break yield texttospeech.StreamingSynthesizeRequest( input=texttospeech.StreamingSynthesisInput(text=text) ) except queue.Empty: continue def audio_processor(): responses = self.client.streaming_synthesize( itertools.chain([config_request], request_generator()) ) for response in responses: self.audio_queue.put(response.audio_content) self.processor_thread = threading.Thread(target=audio_processor) self.processor_thread.start() def send_text(self, text: str): """Send text to be synthesized.""" self.text_queue.put(text) def get_audio(self): """Get the next chunk of audio bytes.""" try: return self.audio_queue.get_nowait() except queue.Empty: return None def stop(self): """Stop the streaming synthesis.""" self.text_queue.put(None) # Send poison pill if self.processor_thread: self.processor_thread.join() def main(): tts = TTSStreamer() tts.start_stream() # Example usage try: while True: text = input("Enter text (or 'q' to quit): ") if text.lower() == 'q': break tts.send_text(text) # Get and print audio bytes while True: audio_chunk = tts.get_audio() if audio_chunk is None: break print(f"Received audio chunk of {len(audio_chunk)} bytes") finally: tts.stop() if __name__ == "__main__": main()