#!/usr/bin/env python import os import time import json import pika import logging from pydub import AudioSegment from datetime import datetime # Configuración de logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('splitter') # Configuración de RabbitMQ RABBITMQ_HOST = 'rabbitmq-app.whisper.svc.cluster.local' RABBITMQ_USER = 'user' RABBITMQ_PASS = 'password' SPLIT_QUEUE = 'audio_split_queue' PROCESS_QUEUE = 'audio_process_queue' # Configuración de segmentos SEGMENT_DURATION_MS = 2 * 60 * 1000 # 1 minuto en milisegundos # Directorios SHARED_DIR = '/app/shared' def connect_to_rabbitmq(): """Establece conexión con RabbitMQ""" tries = 0 while True: try: credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS) connection = pika.BlockingConnection( pika.ConnectionParameters( host=RABBITMQ_HOST, credentials=credentials ) ) return connection except pika.exceptions.AMQPConnectionError: tries += 1 logger.warning(f"Intento {tries}: No se pudo conectar a RabbitMQ. Reintentando en 5 segundos...") time.sleep(5) def split_audio(filepath, file_id, original_filename): """Divide un archivo de audio en segmentos de 5 minutos""" logger.info(f"Dividiendo archivo: {filepath}") # Cargar audio con pydub audio = AudioSegment.from_file(filepath) # Calcular número de segmentos total_duration_ms = len(audio) num_segments = (total_duration_ms + SEGMENT_DURATION_MS - 1) // SEGMENT_DURATION_MS logger.info(f"Audio de duración {total_duration_ms}ms será dividido en {num_segments} segmentos") # Crear directorio para segmentos segments_dir = os.path.join(SHARED_DIR, f"segments_{file_id}") os.makedirs(segments_dir, exist_ok=True) # Dividir en segmentos segments_info = [] for i in range(num_segments): start_ms = i * SEGMENT_DURATION_MS end_ms = min(start_ms + SEGMENT_DURATION_MS, total_duration_ms) # Extraer segmento segment = audio[start_ms:end_ms] # Generar nombre de archivo para segmento segment_filename = f"segment_{i:03d}_{file_id}.wav" segment_path = os.path.join(segments_dir, segment_filename) # Guardar segmento como WAV para Whisper segment.export(segment_path, format="wav") segments_info.append({ 'segment_id': i, 'segment_path': segment_path, 'segment_filename': segment_filename, 'start_ms': start_ms, 'end_ms': end_ms, 'duration_ms': end_ms - start_ms }) logger.info(f"Creado segmento {i+1}/{num_segments}: {segment_filename}") return segments_info, segments_dir def send_segments_to_process_queue(segments_info, original_file_id, original_filename, segments_dir): """Envía los segmentos a la cola para procesamiento con Whisper""" connection = connect_to_rabbitmq() channel = connection.channel() # Declarar cola para procesamiento channel.queue_declare(queue=PROCESS_QUEUE, durable=True) # Información del trabajo completo job_info = { 'original_file_id': original_file_id, 'original_filename': original_filename, 'total_segments': len(segments_info), 'segments_dir': segments_dir, 'timestamp': datetime.now().isoformat() } # Enviar cada segmento a la cola for segment in segments_info: # Mensaje con información del segmento y trabajo message = { **segment, **job_info } channel.basic_publish( exchange='', routing_key=PROCESS_QUEUE, body=json.dumps(message), properties=pika.BasicProperties( delivery_mode=2 # mensaje persistente ) ) logger.info(f"Segmento {segment['segment_id']+1}/{job_info['total_segments']} enviado a la cola de procesamiento") connection.close() def callback(ch, method, properties, body): """Callback para procesar mensajes de la cola de división""" try: # Decodificar mensaje message = json.loads(body) file_id = message['file_id'] filepath = message['filepath'] filename = message['filename'] logger.info(f"Recibido archivo para división: {filename} ({file_id})") # Dividir audio en segmentos segments_info, segments_dir = split_audio(filepath, file_id, filename) # Enviar segmentos a la cola de procesamiento send_segments_to_process_queue(segments_info, file_id, filename, segments_dir) # Confirmar procesamiento ch.basic_ack(delivery_tag=method.delivery_tag) logger.info(f"División completada para {filename} ({file_id})") except Exception as e: logger.error(f"Error procesando mensaje: {str(e)}") # Rechazar mensaje en caso de error para reintentarlo ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) def main(): """Función principal""" connection = connect_to_rabbitmq() channel = connection.channel() # Declarar colas channel.queue_declare(queue=SPLIT_QUEUE, durable=True) channel.queue_declare(queue=PROCESS_QUEUE, durable=True) # Configurar prefetch count channel.basic_qos(prefetch_count=1) # Configurar callback channel.basic_consume(queue=SPLIT_QUEUE, on_message_callback=callback) logger.info("Servicio de división iniciado. Esperando mensajes...") # Iniciar consumo try: channel.start_consuming() except KeyboardInterrupt: logger.info("Servicio de división detenido") channel.stop_consuming() connection.close() if __name__ == "__main__": main()