Files
transcripcion-whisper/splitter/app.py

187 lines
5.9 KiB
Python
Raw Normal View History

2025-05-30 19:09:28 +02:00
#!/usr/bin/env python
import os
import time
import json
import pika
import logging
from pydub import AudioSegment
from datetime import datetime
# Configuración de logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('splitter')
# Configuración de RabbitMQ
RABBITMQ_HOST = 'rabbitmq'
RABBITMQ_USER = 'user'
RABBITMQ_PASS = 'password'
SPLIT_QUEUE = 'audio_split_queue'
PROCESS_QUEUE = 'audio_process_queue'
# Configuración de segmentos
SEGMENT_DURATION_MS = 2 * 60 * 1000 # 1 minuto en milisegundos
# Directorios
SHARED_DIR = '/app/shared'
def connect_to_rabbitmq():
"""Establece conexión con RabbitMQ"""
tries = 0
while True:
try:
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=RABBITMQ_HOST,
credentials=credentials
)
)
return connection
except pika.exceptions.AMQPConnectionError:
tries += 1
logger.warning(f"Intento {tries}: No se pudo conectar a RabbitMQ. Reintentando en 5 segundos...")
time.sleep(5)
def split_audio(filepath, file_id, original_filename):
"""Divide un archivo de audio en segmentos de 5 minutos"""
logger.info(f"Dividiendo archivo: {filepath}")
# Cargar audio con pydub
audio = AudioSegment.from_file(filepath)
# Calcular número de segmentos
total_duration_ms = len(audio)
num_segments = (total_duration_ms + SEGMENT_DURATION_MS - 1) // SEGMENT_DURATION_MS
logger.info(f"Audio de duración {total_duration_ms}ms será dividido en {num_segments} segmentos")
# Crear directorio para segmentos
segments_dir = os.path.join(SHARED_DIR, f"segments_{file_id}")
os.makedirs(segments_dir, exist_ok=True)
# Dividir en segmentos
segments_info = []
for i in range(num_segments):
start_ms = i * SEGMENT_DURATION_MS
end_ms = min(start_ms + SEGMENT_DURATION_MS, total_duration_ms)
# Extraer segmento
segment = audio[start_ms:end_ms]
# Generar nombre de archivo para segmento
segment_filename = f"segment_{i:03d}_{file_id}.wav"
segment_path = os.path.join(segments_dir, segment_filename)
# Guardar segmento como WAV para Whisper
segment.export(segment_path, format="wav")
segments_info.append({
'segment_id': i,
'segment_path': segment_path,
'segment_filename': segment_filename,
'start_ms': start_ms,
'end_ms': end_ms,
'duration_ms': end_ms - start_ms
})
logger.info(f"Creado segmento {i+1}/{num_segments}: {segment_filename}")
return segments_info, segments_dir
def send_segments_to_process_queue(segments_info, original_file_id, original_filename, segments_dir):
"""Envía los segmentos a la cola para procesamiento con Whisper"""
connection = connect_to_rabbitmq()
channel = connection.channel()
# Declarar cola para procesamiento
channel.queue_declare(queue=PROCESS_QUEUE, durable=True)
# Información del trabajo completo
job_info = {
'original_file_id': original_file_id,
'original_filename': original_filename,
'total_segments': len(segments_info),
'segments_dir': segments_dir,
'timestamp': datetime.now().isoformat()
}
# Enviar cada segmento a la cola
for segment in segments_info:
# Mensaje con información del segmento y trabajo
message = {
**segment,
**job_info
}
channel.basic_publish(
exchange='',
routing_key=PROCESS_QUEUE,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2 # mensaje persistente
)
)
logger.info(f"Segmento {segment['segment_id']+1}/{job_info['total_segments']} enviado a la cola de procesamiento")
connection.close()
def callback(ch, method, properties, body):
"""Callback para procesar mensajes de la cola de división"""
try:
# Decodificar mensaje
message = json.loads(body)
file_id = message['file_id']
filepath = message['filepath']
filename = message['filename']
logger.info(f"Recibido archivo para división: {filename} ({file_id})")
# Dividir audio en segmentos
segments_info, segments_dir = split_audio(filepath, file_id, filename)
# Enviar segmentos a la cola de procesamiento
send_segments_to_process_queue(segments_info, file_id, filename, segments_dir)
# Confirmar procesamiento
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info(f"División completada para {filename} ({file_id})")
except Exception as e:
logger.error(f"Error procesando mensaje: {str(e)}")
# Rechazar mensaje en caso de error para reintentarlo
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
def main():
"""Función principal"""
connection = connect_to_rabbitmq()
channel = connection.channel()
# Declarar colas
channel.queue_declare(queue=SPLIT_QUEUE, durable=True)
channel.queue_declare(queue=PROCESS_QUEUE, durable=True)
# Configurar prefetch count
channel.basic_qos(prefetch_count=1)
# Configurar callback
channel.basic_consume(queue=SPLIT_QUEUE, on_message_callback=callback)
logger.info("Servicio de división iniciado. Esperando mensajes...")
# Iniciar consumo
try:
channel.start_consuming()
except KeyboardInterrupt:
logger.info("Servicio de división detenido")
channel.stop_consuming()
connection.close()
if __name__ == "__main__":
main()