# Tests for SSE Aggregator library(testthat) library(aisdk) # ============================================================================ # SSEAggregator Core Tests # ============================================================================ test_that("SSEAggregator accumulates text deltas", { chunks <- character() done_called <- FALSE agg <- SSEAggregator$new(function(text, done) { if (done) { done_called <<- TRUE } else { chunks <<- c(chunks, text) } }) agg$on_text_delta("Hello") agg$on_text_delta(", ") agg$on_text_delta("World!") agg$on_done() result <- agg$build_result() expect_equal(result$text, "Hello, World!") expect_equal(chunks, c("Hello", ", ", "World!")) expect_true(done_called) }) test_that("SSEAggregator handles reasoning transitions", { chunks <- character() agg <- SSEAggregator$new(function(text, done) { if (!done) chunks <<- c(chunks, text) }) # Start reasoning agg$on_reasoning_delta("thinking...") # Transition to content (should auto-close reasoning) agg$on_text_delta("answer") agg$on_done() result <- agg$build_result() expect_equal(result$text, "answer") expect_equal(result$reasoning, "thinking...") # Verify callback sequence: → thinking... → → answer expect_equal(chunks[1], "\n") expect_equal(chunks[2], "thinking...") expect_equal(chunks[3], "\n\n\n") expect_equal(chunks[4], "answer") }) test_that("SSEAggregator closes reasoning on done", { chunks <- character() done_called <- FALSE agg <- SSEAggregator$new(function(text, done) { if (done) { done_called <<- TRUE } else { chunks <<- c(chunks, text) } }) agg$on_reasoning_delta("still thinking") agg$on_done() result <- agg$build_result() expect_true(done_called) # Reasoning should be closed before done callback expect_true("\n\n\n" %in% chunks) }) test_that("SSEAggregator handles reasoning_start and block_stop", { chunks <- character() agg <- SSEAggregator$new(function(text, done) { if (!done) chunks <<- c(chunks, text) }) agg$on_reasoning_start() agg$on_reasoning_delta("deep thought") agg$on_block_stop() # Should close reasoning agg$on_text_delta("answer") agg$on_done() result <- agg$build_result() expect_equal(result$reasoning, "deep thought") expect_equal(result$text, "answer") # block_stop should have closed reasoning, so no double close on text_delta expect_equal(sum(chunks == "\n"), 1) expect_equal(sum(chunks == "\n\n\n"), 1) }) test_that("SSEAggregator ignores empty text deltas", { chunks <- character() agg <- SSEAggregator$new(function(text, done) { if (!done) chunks <<- c(chunks, text) }) agg$on_text_delta(NULL) agg$on_text_delta("") agg$on_reasoning_delta(NULL) agg$on_reasoning_delta("") agg$on_done() result <- agg$build_result() expect_equal(result$text, "") expect_equal(result$reasoning, "") expect_length(chunks, 0) }) # ============================================================================ # Tool Call Tests — OpenAI Format # ============================================================================ test_that("SSEAggregator accumulates OpenAI-format tool calls", { agg <- SSEAggregator$new(function(text, done) {}) # First chunk: tool call start with id and name agg$on_tool_call_delta(list( list( index = 0, id = "call_123", `function` = list(name = "get_weather", arguments = '{"ci') ) )) # Second chunk: continue arguments agg$on_tool_call_delta(list( list( index = 0, `function` = list(arguments = 'ty": "SF"}') ) )) agg$on_finish_reason("tool_calls") result <- agg$build_result() expect_equal(length(result$tool_calls), 1) expect_equal(result$tool_calls[[1]]$id, "call_123") expect_equal(result$tool_calls[[1]]$name, "get_weather") expect_equal(result$tool_calls[[1]]$arguments$city, "SF") expect_equal(result$finish_reason, "tool_calls") }) test_that("SSEAggregator handles multiple tool calls", { agg <- SSEAggregator$new(function(text, done) {}) # Tool 1 agg$on_tool_call_delta(list( list(index = 0, id = "call_1", `function` = list(name = "fn1", arguments = '{"a":1}')) )) # Tool 2 agg$on_tool_call_delta(list( list(index = 1, id = "call_2", `function` = list(name = "fn2", arguments = '{"b":2}')) )) result <- agg$build_result() expect_equal(length(result$tool_calls), 2) expect_equal(result$tool_calls[[1]]$name, "fn1") expect_equal(result$tool_calls[[2]]$name, "fn2") }) test_that("SSEAggregator handles tool calls with list arguments", { agg <- SSEAggregator$new(function(text, done) {}) agg$on_tool_call_delta(list( list( index = 0, id = "call_x", `function` = list(name = "test_fn"), arguments = list(key = "value") ) )) result <- agg$build_result() expect_equal(result$tool_calls[[1]]$arguments$key, "value") }) test_that("SSEAggregator filters tool calls with empty names", { agg <- SSEAggregator$new(function(text, done) {}) agg$on_tool_call_delta(list( list(index = 0, id = "call_1", `function` = list(name = "valid_fn", arguments = "{}")) )) # Tool with empty name agg$on_tool_call_delta(list( list(index = 1, id = "call_2", `function` = list(name = "", arguments = "{}")) )) result <- agg$build_result() expect_equal(length(result$tool_calls), 1) expect_equal(result$tool_calls[[1]]$name, "valid_fn") }) # ============================================================================ # Tool Call Tests — Anthropic Format # ============================================================================ test_that("SSEAggregator handles Anthropic-format tool calls", { agg <- SSEAggregator$new(function(text, done) {}) # content_block_start with tool_use agg$on_tool_start(index = 0, id = "toolu_123", name = "get_weather") # input_json_delta chunks agg$on_tool_input_delta(0, '{"ci') agg$on_tool_input_delta(0, 'ty": ') agg$on_tool_input_delta(0, '"London"}') result <- agg$build_result() expect_equal(length(result$tool_calls), 1) expect_equal(result$tool_calls[[1]]$id, "toolu_123") expect_equal(result$tool_calls[[1]]$name, "get_weather") expect_equal(result$tool_calls[[1]]$arguments$city, "London") }) # ============================================================================ # Metadata Tests # ============================================================================ test_that("SSEAggregator stores usage and finish_reason", { agg <- SSEAggregator$new(function(text, done) {}) agg$on_text_delta("Hi") agg$on_usage(list(prompt_tokens = 10, completion_tokens = 5, total_tokens = 15)) agg$on_finish_reason("stop") agg$on_done() result <- agg$build_result() expect_equal(result$finish_reason, "stop") expect_equal(result$usage$prompt_tokens, 10) expect_equal(result$usage$total_tokens, 15) }) test_that("SSEAggregator stores raw response", { agg <- SSEAggregator$new(function(text, done) {}) raw <- list(id = "chatcmpl-123", model = "gpt-4o") agg$on_raw_response(raw) agg$on_done() result <- agg$build_result() expect_equal(result$raw_response$id, "chatcmpl-123") }) # ============================================================================ # OpenAI Event Mapper Tests # ============================================================================ test_that("map_openai_chunk handles text delta", { chunks <- character() agg <- SSEAggregator$new(function(text, done) { if (!done) chunks <<- c(chunks, text) }) data <- list( choices = list( list( delta = list(content = "Hello"), finish_reason = NULL ) ) ) map_openai_chunk(data, done = FALSE, agg) expect_equal(chunks, "Hello") }) test_that("map_openai_chunk handles reasoning content", { chunks <- character() agg <- SSEAggregator$new(function(text, done) { if (!done) chunks <<- c(chunks, text) }) data <- list( choices = list( list( delta = list(reasoning_content = "Let me think..."), finish_reason = NULL ) ) ) map_openai_chunk(data, done = FALSE, agg) expect_equal(chunks[1], "\n") expect_equal(chunks[2], "Let me think...") }) test_that("map_openai_chunk handles done signal", { done_called <- FALSE agg <- SSEAggregator$new(function(text, done) { if (done) done_called <<- TRUE }) map_openai_chunk(NULL, done = TRUE, agg) expect_true(done_called) }) test_that("map_openai_chunk handles tool call deltas", { agg <- SSEAggregator$new(function(text, done) {}) data <- list( choices = list( list( delta = list( tool_calls = list( list(index = 0, id = "call_1", `function` = list(name = "test", arguments = '{"x":1}')) ) ), finish_reason = "tool_calls" ) ), usage = list(prompt_tokens = 5, completion_tokens = 3, total_tokens = 8) ) map_openai_chunk(data, done = FALSE, agg) result <- agg$build_result() expect_equal(result$finish_reason, "tool_calls") expect_equal(result$tool_calls[[1]]$name, "test") expect_equal(result$usage$total_tokens, 8) }) # ============================================================================ # Anthropic Event Mapper Tests # ============================================================================ test_that("map_anthropic_chunk handles text_delta", { chunks <- character() agg <- SSEAggregator$new(function(text, done) { if (!done) chunks <<- c(chunks, text) }) event_data <- list( delta = list(type = "text_delta", text = "Hello from Claude") ) result <- map_anthropic_chunk("content_block_delta", event_data, agg) expect_false(result) expect_equal(chunks, "Hello from Claude") }) test_that("map_anthropic_chunk handles thinking_delta", { chunks <- character() agg <- SSEAggregator$new(function(text, done) { if (!done) chunks <<- c(chunks, text) }) event_data <- list( delta = list(type = "thinking_delta", thinking = "Considering...") ) map_anthropic_chunk("content_block_delta", event_data, agg) expect_equal(chunks[1], "\n") expect_equal(chunks[2], "Considering...") }) test_that("map_anthropic_chunk handles input_json_delta", { agg <- SSEAggregator$new(function(text, done) {}) # First: tool start start_data <- list( index = 0, content_block = list(type = "tool_use", id = "toolu_1", name = "search") ) map_anthropic_chunk("content_block_start", start_data, agg) # Then: input deltas delta_data <- list( index = 0, delta = list(type = "input_json_delta", partial_json = '{"query": "test"}') ) map_anthropic_chunk("content_block_delta", delta_data, agg) result <- agg$build_result() expect_equal(result$tool_calls[[1]]$name, "search") expect_equal(result$tool_calls[[1]]$arguments$query, "test") }) test_that("map_anthropic_chunk handles thinking block lifecycle", { chunks <- character() agg <- SSEAggregator$new(function(text, done) { if (!done) chunks <<- c(chunks, text) }) # content_block_start with thinking start_data <- list( index = 0, content_block = list(type = "thinking") ) map_anthropic_chunk("content_block_start", start_data, agg) # thinking_delta delta_data <- list( delta = list(type = "thinking_delta", thinking = "Deep thought") ) map_anthropic_chunk("content_block_delta", delta_data, agg) # content_block_stop closes thinking map_anthropic_chunk("content_block_stop", list(), agg) expect_equal(sum(chunks == "\n"), 1) expect_true("Deep thought" %in% chunks) expect_equal(sum(chunks == "\n\n\n"), 1) }) test_that("map_anthropic_chunk handles message_delta with stop_reason", { agg <- SSEAggregator$new(function(text, done) {}) event_data <- list( delta = list(stop_reason = "end_turn"), usage = list(output_tokens = 42) ) map_anthropic_chunk("message_delta", event_data, agg) result <- agg$build_result() expect_equal(result$finish_reason, "end_turn") expect_equal(result$usage$completion_tokens, 42) }) test_that("map_anthropic_chunk returns TRUE on message_stop", { done_called <- FALSE agg <- SSEAggregator$new(function(text, done) { if (done) done_called <<- TRUE }) result <- map_anthropic_chunk("message_stop", list(), agg) expect_true(result) expect_true(done_called) }) # ============================================================================ # Mixed Content Tests # ============================================================================ test_that("SSEAggregator handles reasoning then text then tool calls", { chunks <- character() agg <- SSEAggregator$new(function(text, done) { if (!done) chunks <<- c(chunks, text) }) # Reasoning phase agg$on_reasoning_delta("Let me think...") # Text phase (auto-closes reasoning) agg$on_text_delta("Based on my analysis, ") agg$on_text_delta("I need to call a tool.") # Tool call agg$on_tool_call_delta(list( list(index = 0, id = "call_1", `function` = list(name = "search", arguments = '{"q":"test"}')) )) agg$on_finish_reason("tool_calls") agg$on_done() result <- agg$build_result() expect_equal(result$reasoning, "Let me think...") expect_equal(result$text, "Based on my analysis, I need to call a tool.") expect_equal(result$finish_reason, "tool_calls") expect_equal(length(result$tool_calls), 1) expect_equal(result$tool_calls[[1]]$name, "search") }) test_that("Full OpenAI streaming simulation via map_openai_chunk", { chunks <- character() done_called <- FALSE agg <- SSEAggregator$new(function(text, done) { if (done) { done_called <<- TRUE } else { chunks <<- c(chunks, text) } }) # Simulate 3 SSE chunks map_openai_chunk(list(choices = list(list(delta = list(content = "Hi"), finish_reason = NULL))), FALSE, agg) map_openai_chunk(list(choices = list(list(delta = list(content = " there"), finish_reason = NULL))), FALSE, agg) map_openai_chunk(list( choices = list(list(delta = list(), finish_reason = "stop")), usage = list(prompt_tokens = 10, completion_tokens = 2, total_tokens = 12) ), FALSE, agg) map_openai_chunk(NULL, TRUE, agg) result <- agg$build_result() expect_equal(result$text, "Hi there") expect_equal(result$finish_reason, "stop") expect_equal(result$usage$total_tokens, 12) expect_true(done_called) })