use futures_util::{SinkExt, StreamExt}; use serde_json::json; use solana_sdk::pubkey::Pubkey; use tokio_tungstenite::{connect_async, tungstenite::Message}; use crate::events::primary_market::types::PrimaryMarketEvent; use crate::events::registry::types::RegistryEvent; use crate::events::router::{route_registry_event, route_secondary_market_event, route_primary_market_event}; use crate::events::secondary_market::types::SecondaryMarketEvent; use crate::state::AppState; pub struct ProgramEventListener { program_id: Pubkey, ws_url: String, state: AppState, market_pubkey: String, } impl ProgramEventListener { pub fn new(program_id: Pubkey, ws_url: String, state: AppState) -> Self { // Derive market PDA using the same seeds as on-chain program let (market_pda, _bump) = Pubkey::find_program_address(&[b"market"], &program_id); Self { program_id, ws_url, state, market_pubkey: market_pda.to_string(), } } pub async fn start(&self) -> Result<(), Box> { println!( "🔌 Connecting to WebSocket: {} for program: {}", self.ws_url, self.program_id ); println!("📍 Market PDA: {}", self.market_pubkey); let (ws_stream, _) = connect_async(&self.ws_url).await?; let (mut write, mut read) = ws_stream.split(); let subscribe_msg = json!({ "jsonrpc": "2.0", "id": 1, "method": "logsSubscribe", "params": [ { "mentions": [self.program_id.to_string()] }, { "commitment": "finalized" } ] }); write.send(Message::Text(subscribe_msg.to_string())).await?; println!("✅ Subscribed to program logs"); while let Some(msg) = read.next().await { match msg { Ok(Message::Text(text)) => { if let Err(e) = self.handle_log_message(&text).await { eprintln!("❌ Error handling log message: {}", e); } } Ok(Message::Close(_)) => { println!("⚠️ WebSocket connection closed"); break; } Err(e) => { eprintln!("❌ WebSocket error: {}", e); break; } _ => {} } } Ok(()) } async fn handle_log_message(&self, text: &str) -> Result<(), Box> { let parsed: serde_json::Value = serde_json::from_str(text)?; if let Some(logs) = parsed["params"]["result"]["value"]["logs"].as_array() { for log in logs { if let Some(log_str) = log.as_str() { if log_str.starts_with("Program data: ") { let base64_data = log_str.trim_start_matches("Program data: ").trim(); // Try to parse as RegistryEvent first if let Ok(event) = RegistryEvent::from_base64(base64_data) { println!("📨 Received Registry Event: {:?}", event); route_registry_event(event).await; continue; } // Try to parse as SecondaryMarketEvent if let Ok(event) = SecondaryMarketEvent::from_base64(base64_data) { println!("📨 Received Secondary Market Event: {:?}", event); route_secondary_market_event( event, self.state.clone(), self.market_pubkey.clone(), ) .await; continue; } if let Ok(event) = PrimaryMarketEvent::from_base64(base64_data) { println!("📨 Received Primary Market Event: {:?}", event); route_primary_market_event(event, self.state.clone()).await; continue; } eprintln!("⚠️ Failed to parse event from any known type"); } } } } Ok(()) } }