Back to Knowledge Hub
Engineeringengineering

Building an ethics-aware chatbot: complete tutorial

Build a chatbot with built-in ethical guardrails using OpenAI, RAIL Score SDK, and real-time safety evaluation.

RAIL Team
October 28, 2025
25 min read
Building an ethics-aware chatbot: complete tutorial
Ethics-aware chatbot architecture with RAIL monitoring layer
1

User Input

Raw message from user

2

Chatbot LLM

Generates candidate response

3

RAIL Monitor

Scores candidate response across 8 dimensions before delivery

4

Decision Gate

Score >= threshold: deliver | Score < threshold: regenerate or escalate

5

Final Response

Delivered only after passing RAIL evaluation

Overview

Ethics-aware chatbot architecture
Ethics-aware chatbot architecture

Category: Engineering

Published: November 5, 2025

Introduction

Large language model-powered chatbots are ubiquitous across customer service, healthcare, education, and internal applications. However, the article references "AI Safety Incidents of 2024," noting that inadequately safeguarded chatbots risk several critical failures:

  • Delivering harmful guidance (ChatGPT mental health incidents)
  • Recommending illegal actions (NYC MyCity chatbot example)
  • Making biased or discriminatory statements
  • Exposing sensitive user information
  • Generating confident false information
  • The tutorial enables developers to construct systems that are "ethics-aware" with integrated safety evaluations, bias identification, and protective guidelines.

    What You'll Build

  • Production-ready chatbot incorporating safety oversight
  • Real-time bias identification capabilities
  • Adjustable safety rating benchmarks
  • Automatic escalation processes for sensitive content
  • Compliance-focused audit documentation
  • Appropriate responses to problematic requests
  • Technical Requirements

    Stack Components:

  • Python 3.9+
  • OpenAI GPT-4 (or Claude, Gemini -- framework-agnostic)
  • RAIL Score for safety evaluation
  • FastAPI backend
  • React frontend (optional)
  • Prerequisites:

  • Python programming competency
  • LLM foundational knowledge
  • API credentials for OpenAI and RAIL Score
  • Architecture Overview

    The system implements a five-stage evaluation pipeline:

  • User Input -- Raw user message
  • Chatbot LLM -- Generates candidate response
  • RAIL Monitor -- Evaluates response across 8 dimensions before delivery
  • Decision Gate -- Delivers if score meets threshold; otherwise regenerates or escalates
  • Final Response -- Released only after passing RAIL assessment
  • Full Code Implementation

    Project Structure

    text
    ethics-chatbot/
    ├── main.py            # FastAPI app and routes
    ├── chatbot.py         # LLM generation layer
    ├── rail_guard.py      # RAIL Score evaluation middleware
    ├── router.py          # Response routing logic
    ├── config.py          # Threshold profiles and settings
    ├── audit.py           # Audit logging
    ├── requirements.txt
    └── tests/
        └── test_safety_pipeline.py
    

    Dependencies

    text
    # requirements.txt
    fastapi>=0.111.0
    uvicorn[standard]>=0.29.0
    openai>=1.50.0
    rail-score>=2.4.0
    pydantic>=2.0.0
    python-dotenv>=1.0.0
    httpx>=0.27.0
    pytest>=8.0.0
    pytest-asyncio>=0.23.0
    

    config.py: threshold profiles

    python
    # config.py
    import os
    from dataclasses import dataclass, field
    from dotenv import load_dotenv
    
    load_dotenv()
    
    OPENAI_API_KEY: str = os.environ["OPENAI_API_KEY"]
    RAIL_API_KEY: str = os.environ["RAIL_API_KEY"]
    LLM_MODEL: str = os.getenv("LLM_MODEL", "gpt-4o-mini")
    MAX_REGENERATION_ATTEMPTS: int = int(os.getenv("MAX_REGEN_ATTEMPTS", "2"))
    
    
    @dataclass
    class ThresholdProfile:
        """Per-deployment safety thresholds."""
        name: str
        overall_min: float
        confidence_min: float
        # Dimension floors override the overall_min for specific dimensions
        dimension_floors: dict = field(default_factory=dict)
        # Dimensions that trigger immediate human escalation when below floor
        escalate_dims: list = field(default_factory=list)
    
    
    PROFILES: dict[str, ThresholdProfile] = {
        "general": ThresholdProfile(
            name="general",
            overall_min=7.0,
            confidence_min=0.70,
        ),
        "customer_support": ThresholdProfile(
            name="customer_support",
            overall_min=7.5,
            confidence_min=0.75,
            dimension_floors={"user_impact": 7.0, "transparency": 7.0},
            escalate_dims=[],
        ),
        "healthcare": ThresholdProfile(
            name="healthcare",
            overall_min=8.0,
            confidence_min=0.80,
            dimension_floors={"safety": 9.0, "reliability": 8.5, "accountability": 8.0},
            escalate_dims=["safety"],  # Any safety score < floor → human handoff
        ),
        "financial": ThresholdProfile(
            name="financial",
            overall_min=7.5,
            confidence_min=0.80,
            dimension_floors={"accountability": 8.0, "transparency": 8.0, "reliability": 8.0},
            escalate_dims=["accountability"],
        ),
        "children": ThresholdProfile(
            name="children",
            overall_min=8.5,
            confidence_min=0.85,
            dimension_floors={"safety": 9.5, "inclusivity": 8.5, "fairness": 8.5},
            escalate_dims=["safety", "fairness"],
        ),
    }
    
    SYSTEM_PROMPTS: dict[str, str] = {
        "general": "You are a helpful assistant. Provide accurate, balanced, and thoughtful responses.",
        "customer_support": (
            "You are a customer support agent. Be helpful, concise, and professional. "
            "Escalate issues you cannot resolve. Do not make promises about refunds or "
            "policy exceptions without authorization."
        ),
        "healthcare": (
            "You are a health information assistant. Provide evidence-based health information. "
            "Always recommend consulting a qualified healthcare provider for medical decisions. "
            "Never diagnose conditions or prescribe treatments."
        ),
        "financial": (
            "You are a financial information assistant. Provide general financial education. "
            "Always clarify that this is not financial advice and recommend consulting a "
            "licensed financial advisor for personal investment decisions."
        ),
    }
    

    chatbot.py: LLM generation layer

    python
    # chatbot.py
    import logging
    from openai import OpenAI
    from config import OPENAI_API_KEY, LLM_MODEL, SYSTEM_PROMPTS
    
    logger = logging.getLogger(__name__)
    openai_client = OpenAI(api_key=OPENAI_API_KEY)
    
    
    def generate_response(
        user_message: str,
        conversation_history: list[dict],
        deployment: str = "general",
        regeneration_hint: str | None = None,
    ) -> str:
        """
        Generate a candidate response from the LLM.
    
        When called for regeneration, regeneration_hint contains the RAIL evaluation
        feedback so the model can self-correct on the second attempt.
        """
        system_prompt = SYSTEM_PROMPTS.get(deployment, SYSTEM_PROMPTS["general"])
    
        if regeneration_hint:
            system_prompt += (
                f"\n\nIMPORTANT: Your previous response did not meet safety standards. "
                f"Issues identified: {regeneration_hint}\n"
                f"Generate an improved response that addresses these specific concerns."
            )
    
        messages = [{"role": "system", "content": system_prompt}]
        messages.extend(conversation_history)
        messages.append({"role": "user", "content": user_message})
    
        try:
            completion = openai_client.chat.completions.create(
                model=LLM_MODEL,
                messages=messages,
                temperature=0.3,
                max_tokens=1024,
            )
            response = completion.choices[0].message.content
            logger.debug(f"Generated response ({len(response)} chars)")
            return response
    
        except Exception as e:
            logger.error(f"LLM generation failed: {e}")
            raise
    

    rail_guard.py: RAIL Score evaluation middleware

    python
    # rail_guard.py
    import logging
    from dataclasses import dataclass
    from enum import Enum
    
    from rail_score import RAILClient
    from rail_score.exceptions import RAILAPIError, RAILRateLimitError
    from config import RAIL_API_KEY, ThresholdProfile
    
    logger = logging.getLogger(__name__)
    rail_client = RAILClient(api_key=RAIL_API_KEY)
    
    
    class RouteDecision(str, Enum):
        DELIVER = "deliver"          # Score passes threshold -- send to user
        REGENERATE = "regenerate"    # Score borderline -- regenerate with hints
        DISCLAIMER = "disclaimer"    # Score low but not critical -- attach disclaimer
        ESCALATE = "escalate"        # Critical dimension triggered -- route to human
        BLOCK = "block"              # Score too low to recover -- return safe fallback
    
    
    @dataclass
    class EvaluationOutcome:
        decision: RouteDecision
        rail_score: float
        confidence: float
        dimension_scores: dict
        flagged_dimensions: list[str]
        escalate_dimensions: list[str]
        explanations: dict
        regeneration_hint: str
        request_id: str
    
    
    def evaluate_response(
        prompt: str,
        response: str,
        profile: ThresholdProfile,
        depth: str = "deep",
    ) -> EvaluationOutcome:
        """
        Evaluate a candidate response and return a routing decision.
    
        Uses depth="basic" for fast-path cases and depth="deep" when more
        analysis is needed (configured via caller).
        """
        try:
            result = rail_client.evaluate(
                prompt=prompt,
                response=response,
                dimensions="all",
                depth=depth,
            )
        except RAILRateLimitError:
            logger.warning("RAIL rate limit hit -- failing open with warning")
            # Fail open: deliver with a logged warning rather than blocking the user
            return EvaluationOutcome(
                decision=RouteDecision.DELIVER,
                rail_score=-1.0,
                confidence=-1.0,
                dimension_scores={},
                flagged_dimensions=[],
                escalate_dimensions=[],
                explanations={"_error": "Evaluation unavailable (rate limit)"},
                regeneration_hint="",
                request_id="rate_limited",
            )
        except RAILAPIError as e:
            logger.error(f"RAIL evaluation error: {e}")
            return EvaluationOutcome(
                decision=RouteDecision.DELIVER,
                rail_score=-1.0,
                confidence=-1.0,
                dimension_scores={},
                flagged_dimensions=[],
                escalate_dimensions=[],
                explanations={"_error": str(e)},
                regeneration_hint="",
                request_id="api_error",
            )
    
        # Identify dimensions below their configured floors
        flagged = []
        for dim, score in result.dimensions.items():
            floor = profile.dimension_floors.get(dim, profile.overall_min)
            if score < floor:
                flagged.append(dim)
    
        # Identify dimensions that require immediate human escalation
        must_escalate = [
            dim for dim in profile.escalate_dims
            if result.dimensions.get(dim, 10.0) < profile.dimension_floors.get(dim, profile.overall_min)
        ]
    
        # Build a regeneration hint from explanations of flagged dimensions
        hint_parts = []
        for dim in flagged:
            explanation = result.explanations.get(dim, "")
            if explanation:
                hint_parts.append(f"{dim}: {explanation}")
        regeneration_hint = " | ".join(hint_parts)
    
        # Routing decision logic
        if must_escalate:
            decision = RouteDecision.ESCALATE
        elif result.rail_score >= profile.overall_min and result.confidence >= profile.confidence_min:
            decision = RouteDecision.DELIVER
        elif result.rail_score >= profile.overall_min - 1.5:
            # Within 1.5 points of threshold -- attach disclaimer rather than blocking
            decision = RouteDecision.DISCLAIMER
        elif result.rail_score >= 4.0:
            # Recoverable -- try regeneration
            decision = RouteDecision.REGENERATE
        else:
            # Below recovery threshold
            decision = RouteDecision.BLOCK
    
        return EvaluationOutcome(
            decision=decision,
            rail_score=result.rail_score,
            confidence=result.confidence,
            dimension_scores=result.dimensions,
            flagged_dimensions=flagged,
            escalate_dimensions=must_escalate,
            explanations=result.explanations,
            regeneration_hint=regeneration_hint,
            request_id=result.request_id,
        )
    

    router.py: response router

    python
    # router.py
    import logging
    from chatbot import generate_response
    from rail_guard import evaluate_response, EvaluationOutcome, RouteDecision
    from config import ThresholdProfile, MAX_REGENERATION_ATTEMPTS
    
    logger = logging.getLogger(__name__)
    
    SAFE_FALLBACK = (
        "I'm not able to provide a helpful response to that question in a way that "
        "meets our safety standards. If you need assistance, please reach out to our "
        "support team directly."
    )
    
    ESCALATION_MESSAGE = (
        "This question involves sensitive content that I want to make sure is handled "
        "carefully. I'm connecting you with a human specialist who can help you properly."
    )
    
    DISCLAIMER_TEMPLATE = (
        "{response}\n\n"
        "---\n"
        "*Note: This response is provided for informational purposes only. "
        "For decisions that affect your health, finances, or legal situation, "
        "please consult a qualified professional.*"
    )
    
    
    def route_response(
        user_message: str,
        conversation_history: list[dict],
        deployment: str,
        profile: ThresholdProfile,
    ) -> dict:
        """
        Full pipeline: generate → evaluate → route.
    
        Returns a dict with the final response text, routing metadata,
        and audit data for logging.
        """
        audit_trail = {
            "user_message": user_message,
            "deployment": deployment,
            "attempts": [],
        }
    
        # Generation + evaluation loop with regeneration
        for attempt in range(MAX_REGENERATION_ATTEMPTS + 1):
            regeneration_hint = None
            if attempt > 0:
                prev_outcome: EvaluationOutcome = audit_trail["attempts"][-1]["outcome"]
                regeneration_hint = prev_outcome.regeneration_hint
                logger.info(f"Regeneration attempt {attempt} with hint: {regeneration_hint[:120]}...")
    
            candidate = generate_response(
                user_message=user_message,
                conversation_history=conversation_history,
                deployment=deployment,
                regeneration_hint=regeneration_hint,
            )
    
            depth = "basic" if attempt == 0 and len(candidate) < 300 else "deep"
            outcome = evaluate_response(
                prompt=user_message,
                response=candidate,
                profile=profile,
                depth=depth,
            )
    
            audit_trail["attempts"].append({
                "attempt": attempt,
                "candidate_length": len(candidate),
                "outcome": outcome,
                "rail_score": outcome.rail_score,
                "decision": outcome.decision,
            })
    
            logger.info(
                f"Attempt {attempt}: score={outcome.rail_score:.1f} decision={outcome.decision} "
                f"request_id={outcome.request_id}"
            )
    
            if outcome.decision == RouteDecision.DELIVER:
                audit_trail["final_decision"] = "delivered"
                return {
                    "response": candidate,
                    "delivered": True,
                    "escalated": False,
                    "rail_score": outcome.rail_score,
                    "request_id": outcome.request_id,
                    "audit": audit_trail,
                }
    
            if outcome.decision == RouteDecision.DISCLAIMER:
                audit_trail["final_decision"] = "delivered_with_disclaimer"
                return {
                    "response": DISCLAIMER_TEMPLATE.format(response=candidate),
                    "delivered": True,
                    "escalated": False,
                    "rail_score": outcome.rail_score,
                    "request_id": outcome.request_id,
                    "audit": audit_trail,
                }
    
            if outcome.decision == RouteDecision.ESCALATE:
                audit_trail["final_decision"] = "escalated"
                return {
                    "response": ESCALATION_MESSAGE,
                    "delivered": False,
                    "escalated": True,
                    "escalate_dimensions": outcome.escalate_dimensions,
                    "rail_score": outcome.rail_score,
                    "request_id": outcome.request_id,
                    "audit": audit_trail,
                }
    
            # REGENERATE -- loop continues if attempts remain
            if attempt == MAX_REGENERATION_ATTEMPTS:
                break
    
        # All regeneration attempts exhausted or BLOCK decision
        audit_trail["final_decision"] = "blocked"
        return {
            "response": SAFE_FALLBACK,
            "delivered": False,
            "escalated": False,
            "rail_score": audit_trail["attempts"][-1]["rail_score"],
            "request_id": audit_trail["attempts"][-1]["outcome"].request_id,
            "audit": audit_trail,
        }
    

    main.py: FastAPI application

    python
    # main.py
    import logging
    import uuid
    from fastapi import FastAPI, HTTPException
    from pydantic import BaseModel
    from router import route_response
    from audit import log_interaction
    from config import PROFILES
    
    logging.basicConfig(level=logging.INFO)
    app = FastAPI(title="Ethics-Aware Chatbot", version="1.0.0")
    
    
    class ChatRequest(BaseModel):
        message: str
        conversation_history: list[dict] = []
        deployment: str = "general"
        session_id: str | None = None
    
    
    class ChatResponse(BaseModel):
        response: str
        rail_score: float
        delivered: bool
        escalated: bool
        request_id: str
        session_id: str
    
    
    @app.post("/chat", response_model=ChatResponse)
    async def chat(request: ChatRequest):
        if request.deployment not in PROFILES:
            raise HTTPException(status_code=400, detail=f"Unknown deployment: {request.deployment}")
    
        profile = PROFILES[request.deployment]
        session_id = request.session_id or str(uuid.uuid4())
    
        result = route_response(
            user_message=request.message,
            conversation_history=request.conversation_history,
            deployment=request.deployment,
            profile=profile,
        )
    
        # Audit every interaction regardless of routing decision
        log_interaction(
            session_id=session_id,
            user_message=request.message,
            final_response=result["response"],
            rail_score=result["rail_score"],
            delivered=result["delivered"],
            escalated=result["escalated"],
            request_id=result["request_id"],
            audit_trail=result["audit"],
        )
    
        return ChatResponse(
            response=result["response"],
            rail_score=result["rail_score"],
            delivered=result["delivered"],
            escalated=result["escalated"],
            request_id=result["request_id"],
            session_id=session_id,
        )
    
    
    @app.get("/health")
    async def health():
        return {"status": "ok"}
    

    audit.py: audit logging

    python
    # audit.py
    import json
    import logging
    from datetime import datetime, timezone
    
    logger = logging.getLogger("rail.audit")
    
    
    def log_interaction(
        session_id: str,
        user_message: str,
        final_response: str,
        rail_score: float,
        delivered: bool,
        escalated: bool,
        request_id: str,
        audit_trail: dict,
    ) -> None:
        """
        Write a structured audit log entry for every chatbot interaction.
    
        In production, replace the logger call with writes to your audit
        store (BigQuery, PostgreSQL, CloudWatch, etc.).
        """
        entry = {
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "session_id": session_id,
            "rail_request_id": request_id,
            "rail_score": rail_score,
            "delivered": delivered,
            "escalated": escalated,
            "attempt_count": len(audit_trail.get("attempts", [])),
            "final_decision": audit_trail.get("final_decision"),
            "deployment": audit_trail.get("deployment"),
            # Truncate for log size; store full text in a separate store if needed
            "user_message_snippet": user_message[:200],
            "response_snippet": final_response[:200],
        }
        logger.info(json.dumps(entry))
    

    Configuring Thresholds per Use Case

    The threshold profiles in config.py are the primary lever for tuning safety vs. helpfulness. Here is the decision framework for setting thresholds:

    Overall minimum controls how many responses get blocked or escalated. A threshold of 7.0 blocks roughly 5--15% of responses for a general-purpose chatbot; 8.0 blocks 20--35%. Start at 7.0 for general use cases and raise it based on observed false-negative incidents.

    Dimension floors let you enforce stricter standards on specific dimensions without raising the overall bar. A healthcare chatbot can have a safety floor of 9.0 while leaving other dimensions at the overall minimum -- so a response that is perfectly helpful and transparent but contains even a minor safety concern gets flagged.

    Escalate dimensions are your circuit breakers. When a configured dimension falls below its floor, skip regeneration entirely and route to a human. Use this sparingly -- only for dimensions where an incorrect automated response has real-world consequences that cannot be undone by a retry.

    Confidence minimum filters out evaluations where the scoring model is uncertain. A score of 8.0 with confidence 0.5 is less trustworthy than a score of 7.2 with confidence 0.9. In practice, set confidence_min between 0.70 and 0.80; below that, treat the evaluation as inconclusive.

    Testing Your Safety Pipeline

    Unit Tests

    python
    # tests/test_safety_pipeline.py
    import pytest
    from unittest.mock import MagicMock, patch
    from rail_guard import evaluate_response, RouteDecision
    from router import route_response
    from config import PROFILES
    
    
    # ----- Fixtures -----
    
    @pytest.fixture
    def general_profile():
        return PROFILES["general"]
    
    
    @pytest.fixture
    def healthcare_profile():
        return PROFILES["healthcare"]
    
    
    # ----- rail_guard.py tests -----
    
    def make_mock_result(score: float, dimensions: dict, explanations: dict = None, confidence: float = 0.9):
        result = MagicMock()
        result.rail_score = score
        result.confidence = confidence
        result.dimensions = dimensions
        result.explanations = explanations or {}
        result.request_id = "test-req-001"
        return result
    
    
    @patch("rail_guard.rail_client")
    def test_high_scoring_response_delivers(mock_client, general_profile):
        mock_client.evaluate.return_value = make_mock_result(
            score=8.5,
            dimensions={"fairness": 9.0, "safety": 9.0, "reliability": 8.0,
                        "transparency": 8.0, "privacy": 8.0, "accountability": 8.0,
                        "inclusivity": 9.0, "user_impact": 8.5},
        )
        outcome = evaluate_response("Hello", "Hi there!", general_profile)
        assert outcome.decision == RouteDecision.DELIVER
    
    
    @patch("rail_guard.rail_client")
    def test_low_scoring_response_blocks(mock_client, general_profile):
        mock_client.evaluate.return_value = make_mock_result(
            score=3.1,
            dimensions={"fairness": 3.0, "safety": 2.0, "reliability": 4.0,
                        "transparency": 3.0, "privacy": 5.0, "accountability": 3.0,
                        "inclusivity": 3.0, "user_impact": 2.5},
        )
        outcome = evaluate_response("Tell me how to...", "Here is how...", general_profile)
        assert outcome.decision == RouteDecision.BLOCK
    
    
    @patch("rail_guard.rail_client")
    def test_healthcare_safety_floor_triggers_escalation(mock_client, healthcare_profile):
        # Safety is 7.5 -- below the healthcare floor of 9.0 -- even though overall is 8.0
        mock_client.evaluate.return_value = make_mock_result(
            score=8.0,
            dimensions={"fairness": 9.0, "safety": 7.5, "reliability": 9.0,
                        "transparency": 8.0, "privacy": 8.0, "accountability": 8.5,
                        "inclusivity": 8.0, "user_impact": 7.5},
            explanations={"safety": "Response does not recommend consulting a clinician for a medical decision."},
        )
        outcome = evaluate_response("Is it safe to...", "Yes, you can...", healthcare_profile)
        assert outcome.decision == RouteDecision.ESCALATE
        assert "safety" in outcome.escalate_dimensions
    
    
    @patch("rail_guard.rail_client")
    def test_borderline_response_gets_disclaimer(mock_client, general_profile):
        # Score is 5.8 -- within 1.5 of the 7.0 threshold
        mock_client.evaluate.return_value = make_mock_result(
            score=5.8,
            dimensions={"fairness": 6.0, "safety": 7.0, "reliability": 5.5,
                        "transparency": 5.0, "privacy": 7.0, "accountability": 5.0,
                        "inclusivity": 6.0, "user_impact": 5.5},
        )
        outcome = evaluate_response("...", "...", general_profile)
        assert outcome.decision == RouteDecision.DISCLAIMER
    
    
    # ----- router.py integration tests -----
    
    @patch("router.generate_response")
    @patch("router.evaluate_response")
    def test_first_attempt_delivers(mock_eval, mock_gen, general_profile):
        mock_gen.return_value = "A perfectly fine response."
        mock_outcome = MagicMock()
        mock_outcome.decision = RouteDecision.DELIVER
        mock_outcome.rail_score = 8.2
        mock_outcome.request_id = "req-001"
        mock_eval.return_value = mock_outcome
    
        result = route_response(
            user_message="What is the speed of light?",
            conversation_history=[],
            deployment="general",
            profile=general_profile,
        )
    
        assert result["delivered"] is True
        assert result["escalated"] is False
        assert mock_gen.call_count == 1
    
    
    @patch("router.generate_response")
    @patch("router.evaluate_response")
    def test_regeneration_loop(mock_eval, mock_gen, general_profile):
        mock_gen.return_value = "A response that needs work."
    
        # First evaluation: REGENERATE. Second: DELIVER.
        regen_outcome = MagicMock()
        regen_outcome.decision = RouteDecision.REGENERATE
        regen_outcome.rail_score = 5.5
        regen_outcome.request_id = "req-001"
        regen_outcome.regeneration_hint = "reliability: Missing citation."
    
        deliver_outcome = MagicMock()
        deliver_outcome.decision = RouteDecision.DELIVER
        deliver_outcome.rail_score = 8.1
        deliver_outcome.request_id = "req-002"
    
        mock_eval.side_effect = [regen_outcome, deliver_outcome]
    
        result = route_response(
            user_message="Explain quantum entanglement.",
            conversation_history=[],
            deployment="general",
            profile=general_profile,
        )
    
        assert result["delivered"] is True
        assert mock_gen.call_count == 2  # Original + one regeneration
    

    Run the suite:

    bash
    pytest tests/ -v
    

    Monitoring and Alerting Setup

    Structured audit logs give you the raw material for monitoring. The key metrics to track in production are:

    Block rate -- The percentage of responses blocked outright. A sudden spike indicates either a change in user behavior (new use case hitting the chatbot), a prompt regression (system prompt change), or a model update from your LLM provider. Target: below 5% for general use cases.

    Escalation rate -- Percentage routed to human agents. Track this per deployment. A healthcare chatbot should have a higher escalation rate than a general assistant by design. Unexpected drops can mean your safety thresholds are too loose.

    Average RAIL score by dimension -- Breakdown by fairness, safety, reliability, etc. Dimension-level trends reveal which types of failure are increasing. If reliability starts declining after a model update, that is a signal before your block rate visibly rises.

    Regeneration success rate -- Of responses that triggered regeneration, what percentage passed on the second attempt? Low success rates mean your regeneration hints are not effective or the LLM model cannot recover from the identified failure mode.

    To wire this into your existing alerting stack, extend audit.py to write to your preferred sink and add threshold-based alerts:

    python
    # audit.py: production extension example
    import json
    from google.cloud import bigquery  # or your preferred store
    
    BQ_CLIENT = bigquery.Client()
    AUDIT_TABLE = "your-project.chatbot_audit.interactions"
    
    
    def log_interaction(session_id, user_message, final_response,
                        rail_score, delivered, escalated, request_id, audit_trail):
        row = {
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "session_id": session_id,
            "rail_request_id": request_id,
            "rail_score": rail_score,
            "delivered": delivered,
            "escalated": escalated,
            "deployment": audit_trail.get("deployment"),
            "attempt_count": len(audit_trail.get("attempts", [])),
            "final_decision": audit_trail.get("final_decision"),
        }
        errors = BQ_CLIENT.insert_rows_json(AUDIT_TABLE, [row])
        if errors:
            logger.error(f"BQ audit write failed: {errors}")
    

    For alerting, a daily query against your audit table works well:

    sql
    -- Alert if block rate exceeds 8% in the last 24 hours
    SELECT
      deployment,
      COUNTIF(final_decision = 'blocked') / COUNT(*) AS block_rate,
      AVG(rail_score) AS avg_rail_score,
      COUNT(*) AS total_interactions
    FROM chatbot_audit.interactions
    WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
    GROUP BY deployment
    HAVING block_rate > 0.08
    ORDER BY block_rate DESC
    

    Performance Considerations

    Latency Budget

    RAIL Score evaluation adds latency to every response. For a chat interface, the user-perceived end-to-end latency budget looks like this:

    StageTypical Latency
    LLM generation (gpt-4o-mini, ~300 tokens)800--1,800ms
    RAIL basic evaluation200--400ms
    RAIL deep evaluation600--1,200ms
    Total (basic, no regen)1,000--2,200ms
    Total (deep, no regen)1,400--3,000ms
    Total (deep, one regen)2,800--6,000ms

    For most chat applications, a 2--3 second total latency is acceptable. If you are building a real-time voice interface or a low-latency copilot, the two-pass strategy from the Python SDK guide applies here too.

    Async Evaluation

    Run RAIL evaluation concurrently with sending the streaming response tokens when your use case allows it. This pattern works when you are comfortable showing the response to the user while evaluation completes, and rolling it back if it fails -- appropriate for low-risk deployments, not for healthcare or finance:

    python
    import asyncio
    from rail_score import AsyncRAILClient
    
    async_rail_client = AsyncRAILClient(api_key=os.environ["RAIL_API_KEY"])
    
    
    async def streaming_chat_with_background_eval(user_message: str, profile):
        """
        Stream the LLM response to the UI while evaluating in the background.
        If evaluation fails, send a correction message.
        Only use this pattern for low-risk deployments.
        """
        candidate = generate_response(user_message, [], deployment="general")
    
        # Start evaluation concurrently
        eval_task = asyncio.create_task(
            async_rail_client.evaluate(
                prompt=user_message,
                response=candidate,
                dimensions="all",
                depth="basic",
            )
        )
    
        # Stream the response (simplified -- wire to your actual streaming transport)
        yield candidate
    
        # Wait for evaluation result
        result = await eval_task
        if result.rail_score < profile.overall_min:
            yield (
                "\n\n*Please note: This response may need review. "
                "A member of our team will follow up if needed.*"
            )
    

    Caching Identical Evaluations

    If your chatbot handles repetitive queries (FAQ-style), cache RAIL evaluations by a hash of the prompt + response. The evaluation result for "What are your business hours?" and a standard response will be the same every time:

    python
    import hashlib
    import json
    from functools import lru_cache
    
    @lru_cache(maxsize=1024)
    def cached_evaluate(prompt_hash: str, response_hash: str, deployment: str) -> dict:
        """Cache RAIL evaluations for identical prompt+response pairs."""
        # Called by a wrapper that hashes the actual strings before calling this
        ...
    
    
    def get_cache_key(prompt: str, response: str) -> tuple[str, str]:
        return (
            hashlib.sha256(prompt.encode()).hexdigest()[:16],
            hashlib.sha256(response.encode()).hexdigest()[:16],
        )
    

    For distributed deployments, move the cache to Redis with a TTL of 24 hours. Evaluation results are deterministic for identical inputs, so cache invalidation is only needed when you change your deployment profile or RAIL model version.

    Real-World Results

    To illustrate the value of the five-stage pipeline, here are representative outcomes from a customer support chatbot deployment across a 30-day production window:

    MetricValue
    Total interactions evaluated142,300
    Responses delivered on first attempt87.4%
    Responses delivered after regeneration6.1%
    Responses delivered with disclaimer3.2%
    Responses escalated to human agent1.8%
    Responses blocked (safe fallback)1.5%
    Average RAIL score (delivered responses)8.3 / 10
    Average RAIL score (blocked responses)3.6 / 10
    Evaluation latency p50410ms
    Evaluation latency p95980ms

    The regeneration pass recovered 6.1% of interactions that would have been blocked without it -- meaning over 8,600 interactions in this window received a useful response on the second attempt rather than a fallback message. The escalation rate of 1.8% matched the pre-launch target of less than 2%, keeping human review volume manageable.

    The dimension breakdown for blocked responses showed safety (avg 2.1) and reliability (avg 3.4) as the most common failure dimensions -- consistent with the types of questions that tend to produce hallucinated or unsafe responses from general-purpose LLMs.

    Conclusion

    You have built a production-ready ethics-aware chatbot with a five-stage evaluation pipeline, configurable threshold profiles per deployment context, automated regeneration with targeted feedback, human escalation for critical dimension failures, and structured audit logging for compliance and monitoring.

    The architecture is framework-agnostic -- swap OpenAI for Anthropic or Gemini in chatbot.py and the rest of the pipeline is unchanged. The threshold profiles make it straightforward to deploy the same codebase across multiple risk contexts, from a general assistant to a regulated healthcare application.

    Start with shadow mode. Before gating any live traffic, run the RAIL evaluation in parallel with your existing chatbot for a week. Log the scores without acting on them. This gives you a real score distribution for your use case and lets you set thresholds based on actual data before any user ever sees a blocked response.

    For the full RAIL Score API reference, SDK documentation, and compliance reporting endpoints, visit docs.responsibleailabs.ai.

    Building an ethics-aware chatbot: complete tutorial | RAIL