Skip to main content

Overview

The Pipecat integration monitors voice AI applications built on the open-source Pipecat Python framework. Drop the roark_analytics[pipecat] observer into your existing pipeline and call lifecycle, transcripts, tool invocations, and recordings are forwarded to Roark automatically. The observer is deployment-agnostic — it works the same whether your pipeline runs self-hosted (any Python environment) or on Pipecat Cloud (Daily’s managed Pipecat hosting). The only requirement is that the observer is included in your pipeline at runtime.

Prerequisites

  • A running Pipecat application (Python 3.10+, pipecat-ai >= 0.0.40; tested with pipecat-ai 0.0.108)
  • A Roark API key with WRITE scope (generate one)

Setup Instructions

Step 1: Install the observer

pip install "roark_analytics[pipecat]"

Step 2: Configure your API key

The only setting you need to provide is your Roark API key.
.env
# Roark API key — create one on the API keys page in your Roark project.
ROARK_API_KEY=rk_live_replace_me
VariableRequiredPurpose
ROARK_API_KEYYesRoark API key with WRITE scope. Read by the observer at construction; can also be passed as api_key=.

Step 3: Wire it into your pipeline

Construct a RoarkObserver, splice its audio_processor into your pipeline after transport.output() so the bot channel captures post-TTS audio, and pass the observer in PipelineParams(observers=[...]):
import os

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.openai.tts import OpenAITTSService
from pipecat.services.speechmatics.stt import SpeechmaticsSTTService

from pipecat_roark import RoarkObserver

SYSTEM_PROMPT = (
    "You are a friendly voice assistant. Keep replies short — your output is "
    "spoken aloud."
)


async def run_bot(transport):
    stt = SpeechmaticsSTTService(api_key=os.environ["SPEECHMATICS_API_KEY"])
    llm = OpenAILLMService(api_key=os.environ["OPENAI_API_KEY"], model="gpt-4o-mini")
    tts = OpenAITTSService(api_key=os.environ["OPENAI_API_KEY"], voice="nova")

    context = OpenAILLMContext(
        messages=[{"role": "system", "content": SYSTEM_PROMPT}],
    )
    context_aggregator = llm.create_context_aggregator(context)

    roark = RoarkObserver(
        api_key=os.environ["ROARK_API_KEY"],
        agent_id="support-bot-v1",
        agent_name="Support Bot",
        agent_prompt=SYSTEM_PROMPT,
    )

    pipeline = Pipeline([
        transport.input(),
        stt,
        context_aggregator.user(),
        llm,
        tts,
        transport.output(),
        roark.audio_processor,        # MUST sit after transport.output()
        context_aggregator.assistant(),
    ])

    task = PipelineTask(
        pipeline,
        params=PipelineParams(observers=[roark]),
    )

    @transport.event_handler("on_client_disconnected")
    async def on_client_disconnected(transport, client):
        # Some transports (e.g. SmallWebRTC) tear down without pushing EndFrame.
        # Call aflush() so call-ended is still POSTed.
        await roark.aflush(reason="client-disconnected")
        await task.cancel()

    await PipelineRunner().run(task)
That’s the full integration — agent registration, transcripts, tool calls, and the merged recording are handled by the observer.
ArgumentRequiredDescription
api_keyYesYour Roark API key (WRITE scope). Falls back to ROARK_API_KEY.
agent_idYesStable agent identifier
agent_nameNoDisplay name shown in the Roark dashboard
agent_promptNoSystem prompt — persisted as the agent’s prompt revision
audio_buffer_processorNoPower-user override: pass your own AudioBufferProcessor to control sample rate, channel count, or buffer size. If omitted, the observer creates a default one (stereo, ~256 KB chunks; sample rate adopted from the pipeline’s StartFrame) accessible via observer.audio_processor.
pipecat_call_idNoStable call identifier. Generated internally if omitted. Pass the same value to PipelineTask(conversation_id=...) to correlate with OTel traces — see Observability → Traces

Power-user: bring your own AudioBufferProcessor

If you need to tune sample rate, channel count, or buffer size, instantiate AudioBufferProcessor yourself and pass it via audio_buffer_processor=. Splice that same instance into your pipeline after transport.output() in place of roark.audio_processor:
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor

audio_buffer = AudioBufferProcessor(
    sample_rate=16000,
    num_channels=1,
    buffer_size=128 * 1024,
)

pipeline = Pipeline([..., transport.output(), audio_buffer, ...])

roark = RoarkObserver(
    api_key=os.environ["ROARK_API_KEY"],
    agent_id="support-bot-v1",
    audio_buffer_processor=audio_buffer,
)

Step 4: Verify the connection

Run a test call through your Pipecat pipeline. Within a few seconds you should see:
  1. The agent appear in your Roark dashboard under the Pipecat source filter
  2. The call appear in the calls table with status In Progress
  3. After the call ends — the transcript, tool invocations, and merged recording attached to the call

What Gets Synced

The roark_analytics[pipecat] observer forwards:
  • Calls — Lifecycle with timing and end reason
  • Agents — Lazy-registered on first sight using the agent_id / agent_name from the observer
  • Prompts — System prompt captured at call start
  • Transcripts — Per-turn messages with role, content, and timestamp
  • Tool Invocations — Tool call IDs, names, JSON arguments, and results
  • Recordings — Pre-mixed stereo PCM audio streamed in chunks during the call and merged into a single WAV at call-end
Roark only sees what the observer forwards. If you remove the observer from a pipeline — whether self-hosted or running on Pipecat Cloud — no data flows for those calls.

Agent Management

Pipecat agents are lazy-registered the first time the observer reports them:
  • The first call-started event with a new agent_id creates the agent in Roark
  • Subsequent events update the agent’s name and prompt if they change
  • Pipecat-sourced agents appear in the agents page under the Pipecat source filter
Once an agent exists, it can be used in simulations, run plans, and agent reports just like agents from any other provider.

Deploying on Pipecat Cloud

The observer is a drop-in Pipecat component, so the same wiring works on Pipecat Cloud. Two things to do:
  1. Add roark_analytics[pipecat] to the requirements.txt bundled with your bot image
  2. Expose your ROARK_API_KEY to the deployment as a Pipecat Cloud secret — the observer reads it at runtime
bot.py
import os

from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.transports.services.daily import DailyParams, DailyTransport

from pipecat_roark import RoarkObserver


async def bot(args):
    transport = DailyTransport(
        args.room_url,
        args.token,
        "Roark Bot",
        DailyParams(audio_in_enabled=True, audio_out_enabled=True),
    )

    # ... build stt / llm / tts / context_aggregator as in Step 2 ...

    roark = RoarkObserver(
        api_key=os.environ["ROARK_API_KEY"],
        agent_id="support-bot-v1",
        agent_name="Support Bot",
        agent_prompt=SYSTEM_PROMPT,
    )

    pipeline = Pipeline([
        transport.input(),
        stt,
        context_aggregator.user(),
        llm,
        tts,
        transport.output(),
        roark.audio_processor,
        context_aggregator.assistant(),
    ])

    task = PipelineTask(pipeline, params=PipelineParams(observers=[roark]))

    @transport.event_handler("on_participant_left")
    async def on_participant_left(transport, participant, reason):
        await roark.aflush(reason=f"participant-left:{reason}")
        await task.cancel()

    await PipelineRunner().run(task)
requirements.txt
pipecat-ai[daily,openai,silero]
roark_analytics[pipecat]
No Roark-specific Pipecat Cloud configuration is required beyond the secret.

Monitoring Integration Health

Integration health is observed through the calls and agents that arrive in Roark:
  • No recent calls — Check that the observer is attached to your pipeline and that api_key is correct (on Pipecat Cloud, verify the ROARK_API_KEY secret is set on the deployment)
  • Calls created but no recordings — Confirm roark.audio_processor sits after transport.output() in the pipeline
  • Calls created but no transcript — Verify that your STT processor emits final TranscriptionFrames before the observer (interim transcriptions are ignored)
  • Call never ends in Roark — Some transports (notably SmallWebRTC) tear down without pushing EndFrame. Call await roark.aflush() from your disconnect handler — aflush() is idempotent, so the regular EndFrame path will no-op if both fire

Next Steps

Configure traces

Send OpenTelemetry traces to Roark

Run Simulations

Test your Pipecat agents

Configure Evaluators

Set up evaluation criteria

Integration Overview

Explore other integrations