#!/usr/bin/env python import os import time import json import pika import logging from collections import defaultdict from datetime import datetime # Configuración de logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('unifier') # Configuración de RabbitMQ RABBITMQ_HOST = 'rabbitmq' RABBITMQ_USER = 'user' RABBITMQ_PASS = 'password' UNIFY_QUEUE = 'text_unify_queue' # Directorios SHARED_DIR = '/app/shared' OUTPUT_DIR = '/app/output' # Almacenamiento temporal de transcripciones por trabajo pending_transcriptions = defaultdict(dict) job_metadata = {} def connect_to_rabbitmq(): """Establece conexión con RabbitMQ""" tries = 0 while True: try: credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS) connection = pika.BlockingConnection( pika.ConnectionParameters( host=RABBITMQ_HOST, credentials=credentials ) ) return connection except pika.exceptions.AMQPConnectionError: tries += 1 logger.warning(f"Intento {tries}: No se pudo conectar a RabbitMQ. Reintentando en 5 segundos...") time.sleep(5) def process_transcription(segment_info): """Procesa una transcripción recibida""" original_file_id = segment_info['original_file_id'] segment_id = segment_info['segment_id'] transcription_path = segment_info['transcription_path'] total_segments = segment_info['total_segments'] # Almacenar metadatos del trabajo si no existen if original_file_id not in job_metadata: job_metadata[original_file_id] = { 'original_filename': segment_info['original_filename'], 'total_segments': total_segments, 'start_time': datetime.now() } # Leer transcripción with open(transcription_path, 'r', encoding='utf-8') as f: text = f.read() # Almacenar transcripción pending_transcriptions[original_file_id][segment_id] = { 'text': text, 'start_ms': segment_info['start_ms'], 'end_ms': segment_info['end_ms'] } logger.info(f"Recibida transcripción {segment_id+1}/{total_segments} para {original_file_id}") # Verificar si se han recibido todas las transcripciones para este trabajo if len(pending_transcriptions[original_file_id]) == total_segments: unify_transcriptions(original_file_id) def convert_milliseconds_to_srt_time(ms): """Convierte milisegundos a formato de tiempo SRT (HH:MM:SS,mmm)""" seconds, milliseconds = divmod(ms, 1000) minutes, seconds = divmod(seconds, 60) hours, minutes = divmod(minutes, 60) return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}" def unify_transcriptions(original_file_id): """Unifica las transcripciones de un trabajo y genera el archivo de subtítulos""" logger.info(f"Unificando transcripciones para {original_file_id}") # Obtener metadatos del trabajo metadata = job_metadata[original_file_id] original_filename = metadata['original_filename'] # Preparar nombre de archivo de salida base_filename = os.path.splitext(original_filename)[0] output_filename = f"{base_filename}_transcription.srt" output_path = os.path.join(OUTPUT_DIR, output_filename) # Ordenar transcripciones por segment_id sorted_segments = sorted(pending_transcriptions[original_file_id].items(), key=lambda x: x[0]) # Crear directorio de salida si no existe os.makedirs(OUTPUT_DIR, exist_ok=True) # Generar archivo SRT (formato de subtítulos) with open(output_path, 'w', encoding='utf-8') as f: for i, (segment_id, segment_data) in enumerate(sorted_segments): # Formatear tiempo de inicio y fin start_time = convert_milliseconds_to_srt_time(segment_data['start_ms']) end_time = convert_milliseconds_to_srt_time(segment_data['end_ms']) # Escribir entrada de subtítulo f.write(f"{i+1}\n") f.write(f"{start_time} --> {end_time}\n") f.write(f"{segment_data['text'].strip()}\n\n") # Calcular tiempo total de procesamiento total_time = datetime.now() - metadata['start_time'] logger.info(f"Archivo de subtítulos generado: {output_path}") logger.info(f"Tiempo total de procesamiento: {total_time}") # Limpiar datos del trabajo completado del pending_transcriptions[original_file_id] del job_metadata[original_file_id] def callback(ch, method, properties, body): """Callback para procesar mensajes de la cola de unificación""" try: # Decodificar mensaje segment_info = json.loads(body) # Procesar transcripción process_transcription(segment_info) # Confirmar procesamiento ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: logger.error(f"Error procesando mensaje: {str(e)}") # Rechazar mensaje en caso de error para reintentarlo ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) def main(): """Función principal""" # Asegurar que directorios existen os.makedirs(OUTPUT_DIR, exist_ok=True) # Conexión a RabbitMQ connection = connect_to_rabbitmq() channel = connection.channel() # Declarar cola channel.queue_declare(queue=UNIFY_QUEUE, durable=True) # Configurar prefetch count channel.basic_qos(prefetch_count=1) # Configurar callback channel.basic_consume(queue=UNIFY_QUEUE, on_message_callback=callback) logger.info("Servicio unificador iniciado. Esperando transcripciones...") # Iniciar consumo try: channel.start_consuming() except KeyboardInterrupt: logger.info("Servicio unificador detenido") channel.stop_consuming() connection.close() if __name__ == "__main__": main()