Simplificación de ficheros
This commit is contained in:
187
splitter/app.py
Normal file
187
splitter/app.py
Normal file
@@ -0,0 +1,187 @@
|
||||
#!/usr/bin/env python
|
||||
import os
|
||||
import time
|
||||
import json
|
||||
import pika
|
||||
import logging
|
||||
from pydub import AudioSegment
|
||||
from datetime import datetime
|
||||
|
||||
# Configuración de logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
logger = logging.getLogger('splitter')
|
||||
|
||||
# Configuración de RabbitMQ
|
||||
RABBITMQ_HOST = 'rabbitmq'
|
||||
RABBITMQ_USER = 'user'
|
||||
RABBITMQ_PASS = 'password'
|
||||
SPLIT_QUEUE = 'audio_split_queue'
|
||||
PROCESS_QUEUE = 'audio_process_queue'
|
||||
|
||||
# Configuración de segmentos
|
||||
SEGMENT_DURATION_MS = 2 * 60 * 1000 # 1 minuto en milisegundos
|
||||
|
||||
# Directorios
|
||||
SHARED_DIR = '/app/shared'
|
||||
|
||||
def connect_to_rabbitmq():
|
||||
"""Establece conexión con RabbitMQ"""
|
||||
tries = 0
|
||||
while True:
|
||||
try:
|
||||
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(
|
||||
host=RABBITMQ_HOST,
|
||||
credentials=credentials
|
||||
)
|
||||
)
|
||||
return connection
|
||||
except pika.exceptions.AMQPConnectionError:
|
||||
tries += 1
|
||||
logger.warning(f"Intento {tries}: No se pudo conectar a RabbitMQ. Reintentando en 5 segundos...")
|
||||
time.sleep(5)
|
||||
|
||||
def split_audio(filepath, file_id, original_filename):
|
||||
"""Divide un archivo de audio en segmentos de 5 minutos"""
|
||||
logger.info(f"Dividiendo archivo: {filepath}")
|
||||
|
||||
# Cargar audio con pydub
|
||||
audio = AudioSegment.from_file(filepath)
|
||||
|
||||
# Calcular número de segmentos
|
||||
total_duration_ms = len(audio)
|
||||
num_segments = (total_duration_ms + SEGMENT_DURATION_MS - 1) // SEGMENT_DURATION_MS
|
||||
|
||||
logger.info(f"Audio de duración {total_duration_ms}ms será dividido en {num_segments} segmentos")
|
||||
|
||||
# Crear directorio para segmentos
|
||||
segments_dir = os.path.join(SHARED_DIR, f"segments_{file_id}")
|
||||
os.makedirs(segments_dir, exist_ok=True)
|
||||
|
||||
# Dividir en segmentos
|
||||
segments_info = []
|
||||
|
||||
for i in range(num_segments):
|
||||
start_ms = i * SEGMENT_DURATION_MS
|
||||
end_ms = min(start_ms + SEGMENT_DURATION_MS, total_duration_ms)
|
||||
|
||||
# Extraer segmento
|
||||
segment = audio[start_ms:end_ms]
|
||||
|
||||
# Generar nombre de archivo para segmento
|
||||
segment_filename = f"segment_{i:03d}_{file_id}.wav"
|
||||
segment_path = os.path.join(segments_dir, segment_filename)
|
||||
|
||||
# Guardar segmento como WAV para Whisper
|
||||
segment.export(segment_path, format="wav")
|
||||
|
||||
segments_info.append({
|
||||
'segment_id': i,
|
||||
'segment_path': segment_path,
|
||||
'segment_filename': segment_filename,
|
||||
'start_ms': start_ms,
|
||||
'end_ms': end_ms,
|
||||
'duration_ms': end_ms - start_ms
|
||||
})
|
||||
|
||||
logger.info(f"Creado segmento {i+1}/{num_segments}: {segment_filename}")
|
||||
|
||||
return segments_info, segments_dir
|
||||
|
||||
def send_segments_to_process_queue(segments_info, original_file_id, original_filename, segments_dir):
|
||||
"""Envía los segmentos a la cola para procesamiento con Whisper"""
|
||||
connection = connect_to_rabbitmq()
|
||||
channel = connection.channel()
|
||||
|
||||
# Declarar cola para procesamiento
|
||||
channel.queue_declare(queue=PROCESS_QUEUE, durable=True)
|
||||
|
||||
# Información del trabajo completo
|
||||
job_info = {
|
||||
'original_file_id': original_file_id,
|
||||
'original_filename': original_filename,
|
||||
'total_segments': len(segments_info),
|
||||
'segments_dir': segments_dir,
|
||||
'timestamp': datetime.now().isoformat()
|
||||
}
|
||||
|
||||
# Enviar cada segmento a la cola
|
||||
for segment in segments_info:
|
||||
# Mensaje con información del segmento y trabajo
|
||||
message = {
|
||||
**segment,
|
||||
**job_info
|
||||
}
|
||||
|
||||
channel.basic_publish(
|
||||
exchange='',
|
||||
routing_key=PROCESS_QUEUE,
|
||||
body=json.dumps(message),
|
||||
properties=pika.BasicProperties(
|
||||
delivery_mode=2 # mensaje persistente
|
||||
)
|
||||
)
|
||||
|
||||
logger.info(f"Segmento {segment['segment_id']+1}/{job_info['total_segments']} enviado a la cola de procesamiento")
|
||||
|
||||
connection.close()
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
"""Callback para procesar mensajes de la cola de división"""
|
||||
try:
|
||||
# Decodificar mensaje
|
||||
message = json.loads(body)
|
||||
file_id = message['file_id']
|
||||
filepath = message['filepath']
|
||||
filename = message['filename']
|
||||
|
||||
logger.info(f"Recibido archivo para división: {filename} ({file_id})")
|
||||
|
||||
# Dividir audio en segmentos
|
||||
segments_info, segments_dir = split_audio(filepath, file_id, filename)
|
||||
|
||||
# Enviar segmentos a la cola de procesamiento
|
||||
send_segments_to_process_queue(segments_info, file_id, filename, segments_dir)
|
||||
|
||||
# Confirmar procesamiento
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
logger.info(f"División completada para {filename} ({file_id})")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error procesando mensaje: {str(e)}")
|
||||
# Rechazar mensaje en caso de error para reintentarlo
|
||||
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
||||
|
||||
def main():
|
||||
"""Función principal"""
|
||||
connection = connect_to_rabbitmq()
|
||||
channel = connection.channel()
|
||||
|
||||
# Declarar colas
|
||||
channel.queue_declare(queue=SPLIT_QUEUE, durable=True)
|
||||
channel.queue_declare(queue=PROCESS_QUEUE, durable=True)
|
||||
|
||||
# Configurar prefetch count
|
||||
channel.basic_qos(prefetch_count=1)
|
||||
|
||||
# Configurar callback
|
||||
channel.basic_consume(queue=SPLIT_QUEUE, on_message_callback=callback)
|
||||
|
||||
logger.info("Servicio de división iniciado. Esperando mensajes...")
|
||||
|
||||
# Iniciar consumo
|
||||
try:
|
||||
channel.start_consuming()
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Servicio de división detenido")
|
||||
channel.stop_consuming()
|
||||
|
||||
connection.close()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user