Capítulo 5: Streaming en Tiempo Real
En los capítulos anteriores hemos construido workflows complejos, pero todas las respuestas llegaban de una vez al final. En el mundo real, los usuarios esperan feedback inmediato y actualizaciones en tiempo real.
Imagínate que la Taquería Doña Carmen quiere implementar:
- Notificaciones en vivo del progreso de pedidos
- Actualizaciones automáticas cuando cambia el estado del repartidor
- Streaming de eventos para múltiples clientes simultáneamente
- Procesamiento continuo de pedidos sin bloquear la interfaz
¡Vamos a construir un sistema de streaming completo!
El problema del tiempo real
Cuando un cliente hace un pedido, quiere saber:
"¿Ya empezaron a preparar mi comida?" → Actualización inmediata
"¿Dónde está mi repartidor?" → Ubicación en tiempo real
"¿Cuánto falta?" → Estimación actualizada constantemente
El streaming nos permite enviar estas actualizaciones conforme suceden, no al final del proceso.
Conceptos básicos de streaming
¿Qué es streaming en LlamaIndex?
El streaming permite que los agentes envíen respuestas por partes mientras procesan información:
1// Sin streaming: esperas todo el resultado 2const respuesta = await agente.run({ message: "Hola" }); 3console.log(respuesta); // Se imprime todo de una vez 4 5// Con streaming: recibes partes conforme se generan 6const stream = await agente.runStream({ message: "Hola" }); 7for await (const chunk of stream) { 8 process.stdout.write(chunk.delta); // Se imprime palabra por palabra 9} 10
Ventajas del streaming
- Experiencia más fluida: El usuario ve progreso inmediato
- Mejor percepción de velocidad: Parece más rápido aunque tome el mismo tiempo
- Feedback temprano: Puedes mostrar resultados parciales
- Interactividad: El usuario puede interrumpir o modificar el proceso
Implementando streaming básico
Empezemos con un ejemplo simple de streaming:
1import { agent, tool } from "llamaindex"; 2 3// Herramienta que simula procesamiento paso a paso 4const procesarPedidoConStreaming = tool( 5 async ({ pedido }: { pedido: string }) => { 6 // Simular pasos de procesamiento 7 const pasos = [ 8 "🔍 Analizando tu pedido...", 9 "📋 Verificando ingredientes...", 10 "💰 Calculando precios...", 11 "✅ Pedido procesado correctamente", 12 ]; 13 14 let resultado = ""; 15 16 for (const paso of pasos) { 17 resultado += paso + "\n"; 18 // Simular tiempo de procesamiento 19 await new Promise((resolve) => setTimeout(resolve, 1000)); 20 } 21 22 return { 23 pasos: pasos.length, 24 resultado, 25 mensaje: "Procesamiento completado con streaming", 26 }; 27 }, 28 { 29 name: "procesar_pedido_streaming", 30 description: "Procesa un pedido mostrando progreso paso a paso", 31 } 32); 33 34const agenteStreamingBasico = agent({ 35 tools: [procesarPedidoConStreaming], 36 systemPrompt: ` 37 Eres un asistente que muestra el progreso de procesamiento en tiempo real. 38 39 Cuando proceses un pedido: 40 1. Usa la herramienta de procesamiento con streaming 41 2. Explica cada paso conforme sucede 42 3. Mantén al usuario informado del progreso 43 4. Proporciona feedback continuo 44 45 Ejemplo de flujo: 46 "¡Hola! Voy a procesar tu pedido paso a paso: 47 48 [usar herramienta] 49 50 ¡Listo! Tu pedido ha sido procesado exitosamente." 51 `, 52}); 53
Streaming avanzado con eventos
Ahora vamos a crear un sistema más sofisticado que maneja eventos en tiempo real:
1// Simulador de eventos en tiempo real 2class EventosPedido { 3 private listeners: Map<string, Function[]> = new Map(); 4 5 // Suscribirse a eventos 6 on(evento: string, callback: Function) { 7 if (!this.listeners.has(evento)) { 8 this.listeners.set(evento, []); 9 } 10 this.listeners.get(evento)!.push(callback); 11 } 12 13 // Emitir evento 14 emit(evento: string, data: any) { 15 const callbacks = this.listeners.get(evento) || []; 16 callbacks.forEach((callback) => callback(data)); 17 } 18 19 // Simular progreso de pedido 20 async simularProgresoPedido(pedidoId: string) { 21 const eventos = [ 22 { tipo: "pedido_recibido", mensaje: "📝 Pedido recibido", tiempo: 0 }, 23 { 24 tipo: "preparacion_iniciada", 25 mensaje: "👨🍳 Iniciando preparación", 26 tiempo: 2000, 27 }, 28 { tipo: "comida_lista", mensaje: "🍽️ Comida lista", tiempo: 15000 }, 29 { 30 tipo: "repartidor_asignado", 31 mensaje: "🚚 Repartidor en camino", 32 tiempo: 17000, 33 }, 34 { tipo: "en_ruta", mensaje: "🛣️ En ruta a tu domicilio", tiempo: 20000 }, 35 { tipo: "entregado", mensaje: "✅ ¡Pedido entregado!", tiempo: 35000 }, 36 ]; 37 38 for (const evento of eventos) { 39 setTimeout(() => { 40 this.emit("progreso_pedido", { 41 pedidoId, 42 ...evento, 43 timestamp: new Date().toLocaleTimeString(), 44 }); 45 }, evento.tiempo); 46 } 47 } 48} 49 50const eventosManager = new EventosPedido(); 51 52// Herramienta para iniciar seguimiento en tiempo real 53const iniciarSeguimientoTiempoReal = tool( 54 async ({ pedidoId }: { pedidoId: string }) => { 55 // Iniciar simulación de eventos 56 eventosManager.simularProgresoPedido(pedidoId); 57 58 return { 59 pedidoId, 60 seguimientoIniciado: true, 61 mensaje: `Seguimiento en tiempo real iniciado para pedido ${pedidoId}`, 62 linkSeguimiento: `https://taqueria.com/live/${pedidoId}`, 63 }; 64 }, 65 { 66 name: "iniciar_seguimiento_tiempo_real", 67 description: "Inicia el seguimiento en tiempo real de un pedido", 68 } 69); 70 71// Herramienta para obtener actualizaciones en vivo 72const obtenerActualizacionesVivo = tool( 73 async ({ pedidoId }: { pedidoId: string }) => { 74 return new Promise((resolve) => { 75 const actualizaciones: any[] = []; 76 77 // Escuchar eventos por 5 segundos 78 const listener = (data: any) => { 79 if (data.pedidoId === pedidoId) { 80 actualizaciones.push(data); 81 } 82 }; 83 84 eventosManager.on("progreso_pedido", listener); 85 86 setTimeout(() => { 87 resolve({ 88 pedidoId, 89 actualizaciones, 90 mensaje: `${actualizaciones.length} actualizaciones recibidas`, 91 }); 92 }, 5000); 93 }); 94 }, 95 { 96 name: "obtener_actualizaciones_vivo", 97 description: "Obtiene actualizaciones en tiempo real de un pedido", 98 } 99); 100
Agente de streaming completo
Ahora creamos un agente que maneja streaming y eventos en tiempo real:
1const agenteStreamingCompleto = agent({ 2 tools: [ 3 procesarPedidoConStreaming, 4 iniciarSeguimientoTiempoReal, 5 obtenerActualizacionesVivo, 6 ], 7 systemPrompt: ` 8 Eres el especialista en seguimiento en tiempo real de Taquería Doña Carmen. 9 10 Tu trabajo es proporcionar actualizaciones continuas y streaming de información: 11 12 PARA NUEVOS PEDIDOS: 13 1. Procesa el pedido con streaming paso a paso 14 2. Inicia seguimiento en tiempo real 15 3. Proporciona link de seguimiento en vivo 16 4. Explica cómo funcionan las notificaciones 17 18 PARA CONSULTAS DE ESTADO: 19 1. Obtén actualizaciones en vivo del pedido 20 2. Presenta la información de forma clara 21 3. Indica el siguiente paso esperado 22 4. Proporciona tiempo estimado actualizado 23 24 IMPORTANTE: 25 - Usa streaming para mostrar progreso paso a paso 26 - Proporciona feedback continuo al usuario 27 - Explica cada actualización que recibas 28 - Mantén al usuario informado en todo momento 29 - Usa emojis para hacer más visual el progreso 30 31 Ejemplo de flujo: 32 "¡Hola! Voy a procesar tu pedido en tiempo real: 33 34 🔍 Analizando tu pedido... 35 ✅ Pedido analizado 36 37 📋 Verificando ingredientes... 38 ✅ Ingredientes disponibles 39 40 [continuar con streaming...] 41 42 🚀 ¡Seguimiento en tiempo real activado!" 43 `, 44}); 45
Sistema de notificaciones push
Vamos a crear un sistema que simule notificaciones push reales:
1// Simulador de notificaciones push 2class NotificacionesPush { 3 private suscriptores: Map<string, any> = new Map(); 4 5 // Suscribir cliente a notificaciones 6 suscribir(clienteId: string, callback: Function) { 7 this.suscriptores.set(clienteId, callback); 8 console.log(`📱 Cliente ${clienteId} suscrito a notificaciones push`); 9 } 10 11 // Enviar notificación a cliente específico 12 enviarNotificacion(clienteId: string, notificacion: any) { 13 const callback = this.suscriptores.get(clienteId); 14 if (callback) { 15 callback(notificacion); 16 } 17 } 18 19 // Enviar notificación a todos los clientes 20 broadcast(notificacion: any) { 21 this.suscriptores.forEach((callback, clienteId) => { 22 console.log(`📢 Enviando a ${clienteId}:`, notificacion.mensaje); 23 callback(notificacion); 24 }); 25 } 26} 27 28const pushManager = new NotificacionesPush(); 29 30// Herramienta para gestionar notificaciones 31const gestionarNotificaciones = tool( 32 async ({ 33 accion, 34 clienteId, 35 mensaje, 36 }: { 37 accion: "suscribir" | "enviar" | "broadcast"; 38 clienteId?: string; 39 mensaje?: string; 40 }) => { 41 switch (accion) { 42 case "suscribir": 43 if (clienteId) { 44 pushManager.suscribir(clienteId, (notif: any) => { 45 console.log(`🔔 [${clienteId}] ${notif.mensaje}`); 46 }); 47 return { 48 suscrito: true, 49 mensaje: `Cliente ${clienteId} suscrito a notificaciones`, 50 }; 51 } 52 break; 53 54 case "enviar": 55 if (clienteId && mensaje) { 56 pushManager.enviarNotificacion(clienteId, { 57 mensaje, 58 timestamp: new Date().toLocaleTimeString(), 59 }); 60 return { 61 enviado: true, 62 mensaje: `Notificación enviada a ${clienteId}`, 63 }; 64 } 65 break; 66 67 case "broadcast": 68 if (mensaje) { 69 pushManager.broadcast({ 70 mensaje, 71 timestamp: new Date().toLocaleTimeString(), 72 }); 73 return { 74 broadcast: true, 75 mensaje: "Notificación enviada a todos los clientes", 76 }; 77 } 78 break; 79 } 80 81 return { error: "Parámetros inválidos" }; 82 }, 83 { 84 name: "gestionar_notificaciones", 85 description: "Gestiona suscripciones y envío de notificaciones push", 86 } 87); 88
Ejemplo completo de streaming
Ahora vamos a crear una demostración completa del sistema:
1async function demoStreamingCompleto() { 2 console.log("🔴 DEMO STREAMING EN TIEMPO REAL - TAQUERÍA DOÑA CARMEN"); 3 console.log("=".repeat(70)); 4 5 // Simular cliente suscribiéndose a notificaciones 6 console.log("\n📱 Cliente María se suscribe a notificaciones..."); 7 await agenteStreamingCompleto.runStream({ 8 message: "Quiero suscribirme a notificaciones para mi pedido", 9 }); 10 11 console.log("\n" + "=".repeat(70)); 12 13 // Procesar pedido con streaming 14 console.log("\n🍽️ Procesando pedido con streaming en tiempo real..."); 15 console.log("-".repeat(50)); 16 17 const streamPedido = await agenteStreamingCompleto.runStream({ 18 message: `Hola, quiero hacer un pedido: 3 tacos de pastor y 2 quesadillas. 19 Quiero seguimiento en tiempo real por favor.`, 20 }); 21 22 for await (const chunk of streamPedido) { 23 process.stdout.write(chunk.delta); 24 // Simular pequeña pausa para efecto visual 25 await new Promise((resolve) => setTimeout(resolve, 50)); 26 } 27 28 console.log("\n" + "=".repeat(70)); 29 30 // Simular consulta de estado en tiempo real 31 console.log("\n📊 Consultando estado del pedido en tiempo real..."); 32 console.log("-".repeat(50)); 33 34 // Esperar un poco para que haya eventos 35 await new Promise((resolve) => setTimeout(resolve, 3000)); 36 37 const streamEstado = await agenteStreamingCompleto.runStream({ 38 message: "¿Cuál es el estado actual de mi pedido PED123456?", 39 }); 40 41 for await (const chunk of streamEstado) { 42 process.stdout.write(chunk.delta); 43 await new Promise((resolve) => setTimeout(resolve, 50)); 44 } 45 46 console.log("\n" + "=".repeat(70)); 47 48 // Demostrar notificaciones broadcast 49 console.log("\n📢 Enviando notificación a todos los clientes..."); 50 51 const streamBroadcast = await agenteStreamingCompleto.runStream({ 52 message: 53 "Envía una notificación a todos los clientes sobre una promoción especial", 54 }); 55 56 for await (const chunk of streamBroadcast) { 57 process.stdout.write(chunk.delta); 58 await new Promise((resolve) => setTimeout(resolve, 50)); 59 } 60} 61
Streaming con WebSockets (simulado)
Para aplicaciones web reales, usarías WebSockets. Aquí una simulación:
1// Simulador de WebSocket para streaming 2class WebSocketSimulado { 3 private conexiones: Map<string, Function> = new Map(); 4 5 // Simular conexión de cliente 6 conectar(clienteId: string, onMessage: Function) { 7 this.conexiones.set(clienteId, onMessage); 8 console.log(`🔌 Cliente ${clienteId} conectado via WebSocket`); 9 10 // Enviar mensaje de bienvenida 11 this.enviarMensaje(clienteId, { 12 tipo: "conexion", 13 mensaje: "¡Conectado al streaming en tiempo real!", 14 }); 15 } 16 17 // Enviar mensaje a cliente específico 18 enviarMensaje(clienteId: string, data: any) { 19 const callback = this.conexiones.get(clienteId); 20 if (callback) { 21 callback(data); 22 } 23 } 24 25 // Streaming continuo de datos 26 async iniciarStreaming(clienteId: string, datos: any[]) { 27 for (const dato of datos) { 28 this.enviarMensaje(clienteId, { 29 tipo: "stream", 30 data: dato, 31 timestamp: new Date().toISOString(), 32 }); 33 34 // Pausa entre mensajes para simular streaming 35 await new Promise((resolve) => setTimeout(resolve, 1000)); 36 } 37 38 // Mensaje de finalización 39 this.enviarMensaje(clienteId, { 40 tipo: "fin_stream", 41 mensaje: "Streaming completado", 42 }); 43 } 44} 45 46const wsManager = new WebSocketSimulado(); 47 48// Herramienta para manejar WebSocket streaming 49const manejarWebSocketStreaming = tool( 50 async ({ 51 clienteId, 52 accion, 53 datos, 54 }: { 55 clienteId: string; 56 accion: "conectar" | "stream" | "desconectar"; 57 datos?: any[]; 58 }) => { 59 switch (accion) { 60 case "conectar": 61 wsManager.conectar(clienteId, (message: any) => { 62 console.log(`📨 [${clienteId}] Recibido:`, message); 63 }); 64 return { conectado: true, mensaje: `Cliente ${clienteId} conectado` }; 65 66 case "stream": 67 if (datos) { 68 await wsManager.iniciarStreaming(clienteId, datos); 69 return { 70 streaming: true, 71 mensaje: `Streaming iniciado para ${clienteId}`, 72 }; 73 } 74 break; 75 76 case "desconectar": 77 // Simular desconexión 78 return { 79 desconectado: true, 80 mensaje: `Cliente ${clienteId} desconectado`, 81 }; 82 } 83 84 return { error: "Acción no válida" }; 85 }, 86 { 87 name: "manejar_websocket_streaming", 88 description: 89 "Maneja conexiones WebSocket y streaming de datos en tiempo real", 90 } 91); 92
Ejecutando el sistema completo
1async function ejecutarSistemaStreaming() { 2 console.log("🚀 INICIANDO SISTEMA DE STREAMING COMPLETO"); 3 console.log("=".repeat(60)); 4 5 try { 6 // 1. Demo básico de streaming 7 await demoStreamingCompleto(); 8 9 console.log("\n\n🔌 DEMO WEBSOCKET STREAMING"); 10 console.log("=".repeat(60)); 11 12 // 2. Demo WebSocket streaming 13 const clienteId = "cliente_maria_123"; 14 15 // Conectar cliente 16 await manejarWebSocketStreaming({ 17 clienteId, 18 accion: "conectar", 19 }); 20 21 // Iniciar streaming de datos 22 const datosStreaming = [ 23 { evento: "pedido_recibido", mensaje: "📝 Tu pedido ha sido recibido" }, 24 { evento: "preparacion", mensaje: "👨🍳 Preparando tu comida..." }, 25 { evento: "listo", mensaje: "🍽️ ¡Tu comida está lista!" }, 26 { evento: "en_camino", mensaje: "🚚 Repartidor en camino" }, 27 { evento: "entregado", mensaje: "✅ ¡Pedido entregado!" }, 28 ]; 29 30 await manejarWebSocketStreaming({ 31 clienteId, 32 accion: "stream", 33 datos: datosStreaming, 34 }); 35 36 console.log("\n✅ Demo de streaming completado exitosamente!"); 37 } catch (error) { 38 console.error("❌ Error en el sistema de streaming:", error); 39 } 40} 41 42// Ejecutar el sistema 43ejecutarSistemaStreaming().catch(console.error); 44
Resultado esperado
🚀 INICIANDO SISTEMA DE STREAMING COMPLETO
============================================================
📱 Cliente María se suscribe a notificaciones...
¡Hola María! Te voy a suscribir a nuestras notificaciones en tiempo real...
✅ ¡Suscripción completada! Ahora recibirás actualizaciones automáticas.
======================================================================
🍽️ Procesando pedido con streaming en tiempo real...
--------------------------------------------------
¡Hola! Voy a procesar tu pedido en tiempo real:
🔍 Analizando tu pedido...
✅ 3 tacos de pastor detectados
✅ 2 quesadillas detectadas
📋 Verificando ingredientes...
✅ Pastor disponible (suficiente stock)
✅ Queso disponible (suficiente stock)
✅ Tortillas disponibles
💰 Calculando precios...
• 3 tacos de pastor: $45
• 2 quesadillas: $50
• Total: $95
🚀 Iniciando seguimiento en tiempo real...
📱 Link de seguimiento: https://taqueria.com/live/PED123456
🔔 Notificaciones activadas
✅ ¡Pedido procesado! Recibirás actualizaciones automáticas.
======================================================================
📊 Consultando estado del pedido en tiempo real...
--------------------------------------------------
Consultando el estado actual de tu pedido PED123456...
📡 Obteniendo actualizaciones en vivo...
🔄 Actualizaciones recibidas:
📝 15:30:15 - Pedido recibido
👨🍳 15:32:15 - Iniciando preparación
🍽️ 15:47:15 - Comida lista
📍 Estado actual: Comida lista, esperando repartidor
⏱️ Próximo paso: Asignación de repartidor (2-3 minutos)
======================================================================
🔌 DEMO WEBSOCKET STREAMING
============================================================
🔌 Cliente cliente_maria_123 conectado via WebSocket
📨 [cliente_maria_123] Recibido: {
tipo: 'conexion',
mensaje: '¡Conectado al streaming en tiempo real!'
}
📨 [cliente_maria_123] Recibido: {
tipo: 'stream',
data: { evento: 'pedido_recibido', mensaje: '📝 Tu pedido ha sido recibido' },
timestamp: '2024-01-15T15:30:00.000Z'
}
📨 [cliente_maria_123] Recibido: {
tipo: 'stream',
data: { evento: 'preparacion', mensaje: '👨🍳 Preparando tu comida...' },
timestamp: '2024-01-15T15:31:00.000Z'
}
[... más actualizaciones en tiempo real ...]
✅ Demo de streaming completado exitosamente!
¿Qué acabamos de lograr?
- Streaming paso a paso: Los usuarios ven el progreso conforme sucede
- Eventos en tiempo real: Actualizaciones automáticas sin necesidad de preguntar
- Notificaciones push: Sistema de notificaciones para múltiples clientes
- WebSocket simulado: Base para implementación web real
- Experiencia fluida: Feedback continuo que mejora la percepción del usuario
Conceptos clave aprendidos
Streaming vs. Respuestas tradicionales
Tradicional: Esperar → Procesar → Responder todo junto Streaming: Procesar → Enviar partes → Continuar procesando
Eventos asíncronos
Los eventos permiten que diferentes partes del sistema se comuniquen sin bloquear el flujo principal.
Gestión de estado en tiempo real
El sistema mantiene estado actualizado y lo comparte con múltiples clientes simultáneamente.
Experiencia de usuario mejorada
El streaming hace que las aplicaciones se sientan más responsivas y modernas.
Mejoras que puedes implementar
1. Persistencia de eventos
1// Guardar eventos en base de datos para recuperación 2const guardarEvento = tool(async ({ evento, pedidoId }) => { 3 // await db.eventos.create({ evento, pedidoId, timestamp: new Date() }); 4 return { guardado: true }; 5}); 6
2. Filtros de notificaciones
1// Permitir que usuarios configuren qué notificaciones recibir 2const configurarNotificaciones = tool(async ({ clienteId, preferencias }) => { 3 // Solo enviar notificaciones que el cliente quiere recibir 4 return { configurado: true }; 5}); 6
3. Streaming con límites de velocidad
1// Controlar la velocidad de streaming para no abrumar al usuario 2const streamingControlado = async (datos: any[], velocidad: number) => { 3 for (const dato of datos) { 4 await enviarDato(dato); 5 await new Promise((resolve) => setTimeout(resolve, velocidad)); 6 } 7}; 8
Lo que viene
En el siguiente capítulo exploraremos Integrando Tools Externos, donde aprenderemos:
- Cómo conectar workflows con APIs externas
- Integración con servicios de terceros
- Manejo de autenticación y rate limiting
- Patrones para sistemas híbridos
¡Ya tienes streaming en tiempo real funcionando! 🎉
Ejercicio práctico
Expande el sistema de streaming:
Nivel 1: Básico
- Agregar más tipos de eventos (promociones, alertas de inventario)
- Implementar filtros de streaming (solo eventos importantes)
- Crear sistema de replay (reproducir eventos pasados)
Nivel 2: Intermedio
- Integrar con WebSockets reales usando Socket.io
- Implementar salas de chat para soporte en tiempo real
- Crear dashboard de monitoreo con métricas en vivo
Nivel 3: Avanzado
- Implementar streaming con backpressure (control de flujo)
- Crear sistema de eventos distribuido con Redis
- Implementar streaming de video/audio para llamadas de soporte
¿Te animas a llevarlo al siguiente nivel? En el próximo capítulo veremos cómo integrar con el mundo exterior.
