AWS Bedrock Knowledge Base

AWS Bedrock Flows Guide

Table of Contents


Overview

AWS Bedrock Flows (also known as Prompt Flows) is a visual builder that allows you to create complex generative AI workflows by connecting multiple foundation models, prompts, and data sources without writing code. Flows enable you to orchestrate multi-step AI tasks with conditional logic, data transformations, and integrations.

Key Benefits: - Visual workflow builder (low-code/no-code) - Chain multiple AI operations together - Add conditional logic and branching - Integrate with AWS services (Lambda, S3, DynamoDB) - Version control and deployment management - Built-in testing and debugging

What Are Bedrock Flows?

Bedrock Flows are directed acyclic graphs (DAGs) that define how data moves through a series of nodes:

Input → Prompt Node → Model Node → Condition Node → Output

Use Cases: - Multi-step content generation - Document processing pipelines - Conversational AI with context - Data extraction and transformation - Content moderation workflows - Automated report generation

Key Concepts

Flow

A complete workflow definition that includes nodes, connections, and configuration.

Nodes

Building blocks of a flow. Types include: - Input Node: Entry point for data - Prompt Node: Defines prompts with variables - Model Node: Invokes foundation models - Condition Node: Adds branching logic - Lambda Node: Executes custom code - Knowledge Base Node: Queries knowledge bases (RAG) - Output Node: Returns final results

Connections

Links between nodes that define data flow and execution order.

Variables

Placeholders in prompts that get replaced with actual values at runtime.

Aliases

Versioned deployments of flows for production use.

Prerequisites

AWS Account Requirements: - AWS account with Bedrock access - IAM permissions for Bedrock Flows - Model access enabled for desired foundation models

Required IAM Permissions:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "bedrock:CreateFlow",
        "bedrock:UpdateFlow",
        "bedrock:DeleteFlow",
        "bedrock:GetFlow",
        "bedrock:ListFlows",
        "bedrock:CreateFlowVersion",
        "bedrock:CreateFlowAlias",
        "bedrock:InvokeFlow"
      ],
      "Resource": "*"
    }
  ]
}

SDK Installation:

# Python
pip install boto3

# Node.js
npm install @aws-sdk/client-bedrock-agent-runtime

# AWS CLI
aws configure

Getting Started

Option 1: Using AWS Console (Visual Builder)

  1. Navigate to Bedrock Console

    • Go to AWS Console → Amazon Bedrock → Flows
    • Click "Create flow"
  2. Configure Flow

    • Name: my-first-flow
    • Description: "Simple text generation flow"
    • Service role: Create or select existing role
  3. Build Flow Visually

    • Drag and drop nodes onto canvas
    • Connect nodes by clicking and dragging
    • Configure each node's properties
    • Test the flow
  4. Deploy Flow

    • Create a version
    • Create an alias (e.g., "production")
    • Invoke via API

Option 2: Using AWS SDK (Programmatic)

Create flows programmatically using boto3 or AWS SDK.

Flow Components

1. Input Node

Defines the entry point and expected input structure.

{
  "type": "Input",
  "name": "FlowInput",
  "outputs": [
    {
      "name": "document",
      "type": "String"
    }
  ]
}

2. Prompt Node

Defines prompts with variables that will be filled at runtime.

{
  "type": "Prompt",
  "name": "SummarizePrompt",
  "configuration": {
    "promptTemplate": "Summarize the following document in 3 bullet points:\n\n{{document}}"
  },
  "inputs": [
    {
      "name": "document",
      "type": "String",
      "expression": "$.data.document"
    }
  ]
}

3. Model Node

Invokes a foundation model with the prompt.

{
  "type": "Model",
  "name": "ClaudeModel",
  "configuration": {
    "modelId": "anthropic.claude-3-sonnet-20240229-v1:0",
    "inferenceConfiguration": {
      "temperature": 0.7,
      "topP": 0.9,
      "maxTokens": 2048
    }
  },
  "inputs": [
    {
      "name": "prompt",
      "type": "String",
      "expression": "$.data.SummarizePrompt"
    }
  ]
}

4. Condition Node

Adds branching logic based on conditions.

{
  "type": "Condition",
  "name": "CheckLength",
  "configuration": {
    "conditions": [
      {
        "name": "IsLong",
        "expression": "$.data.document.length > 1000"
      },
      {
        "name": "IsShort",
        "expression": "$.data.document.length <= 1000"
      }
    ]
  }
}

5. Output Node

Returns the final result.

{
  "type": "Output",
  "name": "FlowOutput",
  "inputs": [
    {
      "name": "summary",
      "type": "String",
      "expression": "$.data.ClaudeModel"
    }
  ]
}

Creating Your First Flow

Example 1: Simple Text Summarization Flow

Flow Structure:

Input (document) → Prompt → Model (Claude) → Output (summary)

Step-by-Step Implementation:

Step 1: Create Flow Definition

import boto3
import json

bedrock_agent = boto3.client('bedrock-agent', region_name='us-east-1')

flow_definition = {
    "nodes": [
        {
            "name": "FlowInput",
            "type": "Input",
            "configuration": {
                "input": {}
            },
            "outputs": [
                {
                    "name": "document",
                    "type": "String"
                }
            ]
        },
        {
            "name": "SummarizePrompt",
            "type": "Prompt",
            "configuration": {
                "prompt": {
                    "sourceConfiguration": {
                        "inline": {
                            "templateType": "TEXT",
                            "templateConfiguration": {
                                "text": {
                                    "text": "Summarize this document in 3 bullet points:\n\n{{document}}"
                                }
                            },
                            "inferenceConfiguration": {
                                "text": {
                                    "temperature": 0.7,
                                    "topP": 0.9,
                                    "maxTokens": 500
                                }
                            }
                        }
                    }
                }
            },
            "inputs": [
                {
                    "name": "document",
                    "type": "String",
                    "expression": "$.data.FlowInput.document"
                }
            ],
            "outputs": [
                {
                    "name": "modelCompletion",
                    "type": "String"
                }
            ]
        },
        {
            "name": "FlowOutput",
            "type": "Output",
            "configuration": {
                "output": {}
            },
            "inputs": [
                {
                    "name": "document",
                    "type": "String",
                    "expression": "$.data.SummarizePrompt.modelCompletion"
                }
            ]
        }
    ],
    "connections": [
        {
            "name": "Connection1",
            "source": "FlowInput",
            "target": "SummarizePrompt",
            "type": "Data",
            "configuration": {
                "data": {
                    "sourceOutput": "document",
                    "targetInput": "document"
                }
            }
        },
        {
            "name": "Connection2",
            "source": "SummarizePrompt",
            "target": "FlowOutput",
            "type": "Data",
            "configuration": {
                "data": {
                    "sourceOutput": "modelCompletion",
                    "targetInput": "document"
                }
            }
        }
    ]
}

Step 2: Create the Flow

# Create the flow
response = bedrock_agent.create_flow(
    name='document-summarizer',
    description='Summarizes documents using Claude',
    executionRoleArn='arn:aws:iam::YOUR_ACCOUNT:role/BedrockFlowRole',
    definition=flow_definition
)

flow_id = response['id']
print(f"Flow created: {flow_id}")

Step 3: Prepare the Flow

# Prepare the flow (validates and makes it ready)
prepare_response = bedrock_agent.prepare_flow(
    flowIdentifier=flow_id
)

print(f"Flow status: {prepare_response['status']}")

Step 4: Create a Flow Version

# Create a version for deployment
version_response = bedrock_agent.create_flow_version(
    flowIdentifier=flow_id,
    description='Initial version'
)

flow_version = version_response['version']
print(f"Flow version created: {flow_version}")

Step 5: Create an Alias

# Create an alias pointing to the version
alias_response = bedrock_agent.create_flow_alias(
    flowIdentifier=flow_id,
    name='production',
    description='Production alias',
    routingConfiguration=[
        {
            'flowVersion': flow_version
        }
    ]
)

alias_id = alias_response['id']
print(f"Alias created: {alias_id}")

Simple Examples

Example 2: Content Moderation Flow

Flow Structure:

Input (text) → Moderation Prompt → Model → Condition (safe/unsafe) → Output

Implementation:

moderation_flow = {
    "nodes": [
        {
            "name": "Input",
            "type": "Input",
            "outputs": [{"name": "text", "type": "String"}]
        },
        {
            "name": "ModerationPrompt",
            "type": "Prompt",
            "configuration": {
                "prompt": {
                    "sourceConfiguration": {
                        "inline": {
                            "templateType": "TEXT",
                            "templateConfiguration": {
                                "text": {
                                    "text": """Analyze this text for inappropriate content.
Respond with only 'SAFE' or 'UNSAFE':

{{text}}"""
                                }
                            },
                            "inferenceConfiguration": {
                                "text": {
                                    "temperature": 0.1,
                                    "maxTokens": 10
                                }
                            }
                        }
                    }
                }
            },
            "inputs": [
                {
                    "name": "text",
                    "type": "String",
                    "expression": "$.data.Input.text"
                }
            ]
        },
        {
            "name": "Output",
            "type": "Output",
            "inputs": [
                {
                    "name": "result",
                    "type": "String",
                    "expression": "$.data.ModerationPrompt.modelCompletion"
                }
            ]
        }
    ],
    "connections": [
        {
            "name": "InputToPrompt",
            "source": "Input",
            "target": "ModerationPrompt",
            "type": "Data"
        },
        {
            "name": "PromptToOutput",
            "source": "ModerationPrompt",
            "target": "Output",
            "type": "Data"
        }
    ]
}

# Create the flow
response = bedrock_agent.create_flow(
    name='content-moderator',
    description='Moderates user-generated content',
    executionRoleArn='arn:aws:iam::YOUR_ACCOUNT:role/BedrockFlowRole',
    definition=moderation_flow
)

Example 3: Multi-Step Translation Flow

Flow Structure:

Input (text, target_language) → Translation Prompt → Model → Quality Check Prompt → Model → Output

Implementation:

translation_flow = {
    "nodes": [
        {
            "name": "Input",
            "type": "Input",
            "outputs": [
                {"name": "text", "type": "String"},
                {"name": "target_language", "type": "String"}
            ]
        },
        {
            "name": "TranslatePrompt",
            "type": "Prompt",
            "configuration": {
                "prompt": {
                    "sourceConfiguration": {
                        "inline": {
                            "templateType": "TEXT",
                            "templateConfiguration": {
                                "text": {
                                    "text": "Translate the following text to {{target_language}}:\n\n{{text}}"
                                }
                            },
                            "inferenceConfiguration": {
                                "text": {
                                    "temperature": 0.3,
                                    "maxTokens": 2000
                                }
                            }
                        }
                    }
                }
            },
            "inputs": [
                {
                    "name": "text",
                    "type": "String",
                    "expression": "$.data.Input.text"
                },
                {
                    "name": "target_language",
                    "type": "String",
                    "expression": "$.data.Input.target_language"
                }
            ]
        },
        {
            "name": "QualityCheckPrompt",
            "type": "Prompt",
            "configuration": {
                "prompt": {
                    "sourceConfiguration": {
                        "inline": {
                            "templateType": "TEXT",
                            "templateConfiguration": {
                                "text": {
                                    "text": """Review this translation for accuracy and fluency.
Original: {{original}}
Translation: {{translation}}

Provide a quality score (1-10) and brief feedback."""
                                }
                            },
                            "inferenceConfiguration": {
                                "text": {
                                    "temperature": 0.5,
                                    "maxTokens": 500
                                }
                            }
                        }
                    }
                }
            },
            "inputs": [
                {
                    "name": "original",
                    "type": "String",
                    "expression": "$.data.Input.text"
                },
                {
                    "name": "translation",
                    "type": "String",
                    "expression": "$.data.TranslatePrompt.modelCompletion"
                }
            ]
        },
        {
            "name": "Output",
            "type": "Output",
            "inputs": [
                {
                    "name": "translation",
                    "type": "String",
                    "expression": "$.data.TranslatePrompt.modelCompletion"
                },
                {
                    "name": "quality_review",
                    "type": "String",
                    "expression": "$.data.QualityCheckPrompt.modelCompletion"
                }
            ]
        }
    ],
    "connections": [
        {
            "name": "InputToTranslate",
            "source": "Input",
            "target": "TranslatePrompt",
            "type": "Data"
        },
        {
            "name": "TranslateToQuality",
            "source": "TranslatePrompt",
            "target": "QualityCheckPrompt",
            "type": "Data"
        },
        {
            "name": "QualityToOutput",
            "source": "QualityCheckPrompt",
            "target": "Output",
            "type": "Data"
        }
    ]
}

Advanced Flow Patterns

Pattern 1: Conditional Branching

Route data based on conditions:

conditional_flow = {
    "nodes": [
        {
            "name": "Input",
            "type": "Input",
            "outputs": [{"name": "document_length", "type": "Number"}]
        },
        {
            "name": "LengthCheck",
            "type": "Condition",
            "configuration": {
                "condition": {
                    "conditions": [
                        {
                            "name": "IsLong",
                            "expression": "$.data.Input.document_length > 1000"
                        },
                        {
                            "name": "IsShort",
                            "expression": "$.data.Input.document_length <= 1000"
                        }
                    ]
                }
            },
            "inputs": [
                {
                    "name": "document_length",
                    "type": "Number",
                    "expression": "$.data.Input.document_length"
                }
            ]
        },
        {
            "name": "DetailedSummaryPrompt",
            "type": "Prompt",
            "configuration": {
                "prompt": {
                    "sourceConfiguration": {
                        "inline": {
                            "templateType": "TEXT",
                            "templateConfiguration": {
                                "text": {
                                    "text": "Provide a detailed summary with key points..."
                                }
                            }
                        }
                    }
                }
            }
        },
        {
            "name": "QuickSummaryPrompt",
            "type": "Prompt",
            "configuration": {
                "prompt": {
                    "sourceConfiguration": {
                        "inline": {
                            "templateType": "TEXT",
                            "templateConfiguration": {
                                "text": {
                                    "text": "Provide a brief one-sentence summary..."
                                }
                            }
                        }
                    }
                }
            }
        }
    ],
    "connections": [
        {
            "name": "InputToCondition",
            "source": "Input",
            "target": "LengthCheck",
            "type": "Data"
        },
        {
            "name": "LongPath",
            "source": "LengthCheck",
            "target": "DetailedSummaryPrompt",
            "type": "Conditional",
            "configuration": {
                "conditional": {
                    "condition": "IsLong"
                }
            }
        },
        {
            "name": "ShortPath",
            "source": "LengthCheck",
            "target": "QuickSummaryPrompt",
            "type": "Conditional",
            "configuration": {
                "conditional": {
                    "condition": "IsShort"
                }
            }
        }
    ]
}

Pattern 2: Lambda Integration

Execute custom code within a flow:

# First, create a Lambda function
lambda_function_code = """
import json

def lambda_handler(event, context):
    # Custom processing logic
    text = event.get('text', '')

    # Example: Extract metadata
    word_count = len(text.split())
    char_count = len(text)

    return {
        'statusCode': 200,
        'body': json.dumps({
            'word_count': word_count,
            'char_count': char_count,
            'processed': True
        })
    }
"""

# Flow with Lambda node
lambda_flow = {
    "nodes": [
        {
            "name": "Input",
            "type": "Input",
            "outputs": [{"name": "text", "type": "String"}]
        },
        {
            "name": "ProcessWithLambda",
            "type": "LambdaFunction",
            "configuration": {
                "lambdaFunction": {
                    "lambdaArn": "arn:aws:lambda:us-east-1:ACCOUNT:function:text-processor"
                }
            },
            "inputs": [
                {
                    "name": "text",
                    "type": "String",
                    "expression": "$.data.Input.text"
                }
            ]
        },
        {
            "name": "GenerateReport",
            "type": "Prompt",
            "configuration": {
                "prompt": {
                    "sourceConfiguration": {
                        "inline": {
                            "templateType": "TEXT",
                            "templateConfiguration": {
                                "text": {
                                    "text": """Generate a report based on this analysis:
Word count: {{word_count}}
Character count: {{char_count}}

Provide insights about the text length and complexity."""
                                }
                            }
                        }
                    }
                }
            },
            "inputs": [
                {
                    "name": "word_count",
                    "type": "Number",
                    "expression": "$.data.ProcessWithLambda.word_count"
                },
                {
                    "name": "char_count",
                    "type": "Number",
                    "expression": "$.data.ProcessWithLambda.char_count"
                }
            ]
        },
        {
            "name": "Output",
            "type": "Output",
            "inputs": [
                {
                    "name": "report",
                    "type": "String",
                    "expression": "$.data.GenerateReport.modelCompletion"
                }
            ]
        }
    ]
}

Pattern 3: Knowledge Base Integration (RAG)

Integrate with Bedrock Knowledge Bases for retrieval-augmented generation:

rag_flow = {
    "nodes": [
        {
            "name": "Input",
            "type": "Input",
            "outputs": [{"name": "query", "type": "String"}]
        },
        {
            "name": "RetrieveFromKB",
            "type": "KnowledgeBase",
            "configuration": {
                "knowledgeBase": {
                    "knowledgeBaseId": "YOUR_KB_ID",
                    "retrievalConfiguration": {
                        "vectorSearchConfiguration": {
                            "numberOfResults": 5
                        }
                    }
                }
            },
            "inputs": [
                {
                    "name": "query",
                    "type": "String",
                    "expression": "$.data.Input.query"
                }
            ]
        },
        {
            "name": "GenerateAnswer",
            "type": "Prompt",
            "configuration": {
                "prompt": {
                    "sourceConfiguration": {
                        "inline": {
                            "templateType": "TEXT",
                            "templateConfiguration": {
                                "text": {
                                    "text": """Based on the following context, answer the question.

Context:
{{context}}

Question: {{query}}

Answer:"""
                                }
                            },
                            "inferenceConfiguration": {
                                "text": {
                                    "temperature": 0.5,
                                    "maxTokens": 1000
                                }
                            }
                        }
                    }
                }
            },
            "inputs": [
                {
                    "name": "context",
                    "type": "String",
                    "expression": "$.data.RetrieveFromKB.retrievalResults"
                },
                {
                    "name": "query",
                    "type": "String",
                    "expression": "$.data.Input.query"
                }
            ]
        },
        {
            "name": "Output",
            "type": "Output",
            "inputs": [
                {
                    "name": "answer",
                    "type": "String",
                    "expression": "$.data.GenerateAnswer.modelCompletion"
                },
                {
                    "name": "sources",
                    "type": "Array",
                    "expression": "$.data.RetrieveFromKB.retrievalResults"
                }
            ]
        }
    ]
}

Invoking Flows

Method 1: Using Boto3 (Python)

import boto3
import json

# Create runtime client
bedrock_agent_runtime = boto3.client('bedrock-agent-runtime', region_name='us-east-1')

def invoke_flow(flow_id, flow_alias_id, input_data):
    """
    Invoke a Bedrock Flow

    Args:
        flow_id: The flow identifier
        flow_alias_id: The alias identifier
        input_data: Dictionary of input parameters

    Returns:
        Flow execution result
    """
    response = bedrock_agent_runtime.invoke_flow(
        flowIdentifier=flow_id,
        flowAliasIdentifier=flow_alias_id,
        inputs=[
            {
                'content': {
                    'document': input_data
                },
                'nodeName': 'FlowInput',
                'nodeOutputName': 'document'
            }
        ]
    )

    # Process the response stream
    result = []
    for event in response['responseStream']:
        if 'flowOutputEvent' in event:
            output = event['flowOutputEvent']
            result.append(output)
        elif 'flowCompletionEvent' in event:
            completion = event['flowCompletionEvent']
            print(f"Flow completed: {completion['completionReason']}")

    return result

# Example usage
flow_result = invoke_flow(
    flow_id='YOUR_FLOW_ID',
    flow_alias_id='YOUR_ALIAS_ID',
    input_data={
        'document': 'This is a long document that needs to be summarized...'
    }
)

print(json.dumps(flow_result, indent=2))

Method 2: Streaming Response

def invoke_flow_streaming(flow_id, flow_alias_id, input_data):
    """
    Invoke flow with streaming response
    """
    response = bedrock_agent_runtime.invoke_flow(
        flowIdentifier=flow_id,
        flowAliasIdentifier=flow_alias_id,
        inputs=[
            {
                'content': {
                    'document': input_data
                },
                'nodeName': 'FlowInput',
                'nodeOutputName': 'document'
            }
        ]
    )

    # Stream the response
    for event in response['responseStream']:
        if 'flowOutputEvent' in event:
            output_event = event['flowOutputEvent']

            # Extract content
            if 'content' in output_event:
                content = output_event['content']
                if 'document' in content:
                    print(content['document'], end='', flush=True)

        elif 'flowTraceEvent' in event:
            # Trace information for debugging
            trace = event['flowTraceEvent']
            print(f"\n[Trace] Node: {trace.get('nodeName')}")

        elif 'flowCompletionEvent' in event:
            completion = event['flowCompletionEvent']
            print(f"\n\n[Completed] Reason: {completion['completionReason']}")

# Usage
invoke_flow_streaming(
    flow_id='YOUR_FLOW_ID',
    flow_alias_id='production',
    input_data={'text': 'Analyze this content...'}
)

Method 3: Complete Flow Invocation Class

import boto3
import json
from typing import Dict, Any, List

class BedrockFlowClient:
    """
    Helper class for working with Bedrock Flows
    """

    def __init__(self, region_name='us-east-1'):
        self.agent_client = boto3.client('bedrock-agent', region_name=region_name)
        self.runtime_client = boto3.client('bedrock-agent-runtime', region_name=region_name)

    def create_flow(self, name: str, description: str, definition: Dict, role_arn: str):
        """Create a new flow"""
        response = self.agent_client.create_flow(
            name=name,
            description=description,
            executionRoleArn=role_arn,
            definition=definition
        )
        return response['id']

    def prepare_flow(self, flow_id: str):
        """Prepare flow for execution"""
        response = self.agent_client.prepare_flow(flowIdentifier=flow_id)
        return response['status']

    def create_version(self, flow_id: str, description: str = None):
        """Create a flow version"""
        params = {'flowIdentifier': flow_id}
        if description:
            params['description'] = description

        response = self.agent_client.create_flow_version(**params)
        return response['version']

    def create_alias(self, flow_id: str, alias_name: str, flow_version: str):
        """Create an alias for a flow version"""
        response = self.agent_client.create_flow_alias(
            flowIdentifier=flow_id,
            name=alias_name,
            routingConfiguration=[
                {'flowVersion': flow_version}
            ]
        )
        return response['id']

    def invoke(self, flow_id: str, alias_id: str, inputs: Dict[str, Any]) -> Dict:
        """
        Invoke a flow and return the complete result

        Args:
            flow_id: Flow identifier
            alias_id: Alias identifier
            inputs: Dictionary of input values

        Returns:
            Dictionary containing the flow output
        """
        # Format inputs for the API
        formatted_inputs = []
        for node_name, node_data in inputs.items():
            for output_name, value in node_data.items():
                formatted_inputs.append({
                    'content': {'document': value},
                    'nodeName': node_name,
                    'nodeOutputName': output_name
                })

        response = self.runtime_client.invoke_flow(
            flowIdentifier=flow_id,
            flowAliasIdentifier=alias_id,
            inputs=formatted_inputs
        )

        # Collect results
        results = []
        traces = []

        for event in response['responseStream']:
            if 'flowOutputEvent' in event:
                results.append(event['flowOutputEvent'])
            elif 'flowTraceEvent' in event:
                traces.append(event['flowTraceEvent'])

        return {
            'outputs': results,
            'traces': traces
        }

    def list_flows(self):
        """List all flows"""
        response = self.agent_client.list_flows()
        return response['flowSummaries']

    def delete_flow(self, flow_id: str):
        """Delete a flow"""
        self.agent_client.delete_flow(flowIdentifier=flow_id)

# Usage Example
client = BedrockFlowClient()

# Create and deploy a flow
flow_id = client.create_flow(
    name='my-summarizer',
    description='Document summarization flow',
    definition=flow_definition,
    role_arn='arn:aws:iam::ACCOUNT:role/BedrockFlowRole'
)

client.prepare_flow(flow_id)
version = client.create_version(flow_id, 'v1.0')
alias_id = client.create_alias(flow_id, 'production', version)

# Invoke the flow
result = client.invoke(
    flow_id=flow_id,
    alias_id=alias_id,
    inputs={
        'FlowInput': {
            'document': 'Your document text here...'
        }
    }
)

print(json.dumps(result, indent=2))

Best Practices

1. Flow Design

Keep flows modular:

# ✅ Good - Single responsibility
flow_name = "document-summarizer"

# ❌ Bad - Too many responsibilities
flow_name = "document-summarizer-translator-analyzer-reporter"

Use meaningful node names:

# ✅ Good
node_name = "ExtractKeyPoints"

# ❌ Bad
node_name = "Node1"

2. Error Handling

Add error handling nodes to catch failures:

error_handling_flow = {
    "nodes": [
        # ... other nodes ...
        {
            "name": "ErrorHandler",
            "type": "Prompt",
            "configuration": {
                "prompt": {
                    "sourceConfiguration": {
                        "inline": {
                            "templateType": "TEXT",
                            "templateConfiguration": {
                                "text": {
                                    "text": "An error occurred. Please provide a user-friendly error message."
                                }
                            }
                        }
                    }
                }
            }
        }
    ]
}

3. Version Control

Always version your flows before deploying:

# Create versions for major changes
version_1 = client.create_version(flow_id, "Initial release")
version_2 = client.create_version(flow_id, "Added error handling")
version_3 = client.create_version(flow_id, "Improved prompts")

# Use aliases for environments
client.create_alias(flow_id, "development", version_3)
client.create_alias(flow_id, "staging", version_2)
client.create_alias(flow_id, "production", version_1)

4. Testing

Test flows thoroughly before production:

def test_flow(flow_id, alias_id, test_cases):
    """
    Test a flow with multiple test cases
    """
    results = []

    for test_case in test_cases:
        print(f"Testing: {test_case['name']}")

        result = client.invoke(
            flow_id=flow_id,
            alias_id=alias_id,
            inputs=test_case['inputs']
        )

        results.append({
            'test_case': test_case['name'],
            'input': test_case['inputs'],
            'output': result,
            'passed': validate_output(result, test_case['expected'])
        })

    return results

# Test cases
test_cases = [
    {
        'name': 'Short document',
        'inputs': {'FlowInput': {'document': 'Short text'}},
        'expected': {'length': 'short'}
    },
    {
        'name': 'Long document',
        'inputs': {'FlowInput': {'document': 'Very long text...' * 100}},
        'expected': {'length': 'long'}
    }
]

test_results = test_flow(flow_id, 'development', test_cases)

5. Monitoring

Monitor flow executions:

import boto3

cloudwatch = boto3.client('cloudwatch')

def log_flow_metrics(flow_id, execution_time, success):
    """
    Log custom metrics to CloudWatch
    """
    cloudwatch.put_metric_data(
        Namespace='BedrockFlows',
        MetricData=[
            {
                'MetricName': 'ExecutionTime',
                'Value': execution_time,
                'Unit': 'Seconds',
                'Dimensions': [
                    {'Name': 'FlowId', 'Value': flow_id}
                ]
            },
            {
                'MetricName': 'Success',
                'Value': 1 if success else 0,
                'Unit': 'Count',
                'Dimensions': [
                    {'Name': 'FlowId', 'Value': flow_id}
                ]
            }
        ]
    )

6. Cost Optimization

Optimize token usage:

# Use appropriate max_tokens for each prompt
inference_config = {
    "text": {
        "temperature": 0.7,
        "maxTokens": 500  # Don't use 4000 if you only need 500
    }
}

Cache common results:

import hashlib
import json

cache = {}

def invoke_with_cache(flow_id, alias_id, inputs):
    """
    Cache flow results to avoid redundant invocations
    """
    # Create cache key
    cache_key = hashlib.md5(
        json.dumps(inputs, sort_keys=True).encode()
    ).hexdigest()

    # Check cache
    if cache_key in cache:
        print("Cache hit!")
        return cache[cache_key]

    # Invoke flow
    result = client.invoke(flow_id, alias_id, inputs)

    # Store in cache
    cache[cache_key] = result

    return result

AWS Code Samples and Resources

Official AWS Documentation

  1. Bedrock Flows User Guide

    • URL: https://docs.aws.amazon.com/bedrock/latest/userguide/flows.html
    • Comprehensive guide to creating and managing flows
  2. Bedrock API Reference

    • URL: https://docs.aws.amazon.com/bedrock/latest/APIReference/
    • Complete API documentation for flows
  3. Bedrock Agent Runtime API

    • URL: https://docs.aws.amazon.com/bedrock/latest/APIReference/APIOperationsAgentsforAmazonBedrockRuntime.html
    • Runtime API for invoking flows

AWS Code Samples Repository

GitHub Repository:

https://github.com/aws-samples/amazon-bedrock-samples

Key Examples: - agents-and-function-calling/bedrock-flows/ - Flow examples - prompt-engineering/ - Prompt templates for flows - knowledge-bases/ - RAG integration examples

AWS SDK Examples

Python (Boto3):

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-agent.html

JavaScript/TypeScript:

https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/client/bedrock-agent/

Java:

https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/bedrockagent/package-summary.html

AWS Workshops and Tutorials

  1. Bedrock Workshop

    • URL: https://catalog.workshops.aws/amazon-bedrock/
    • Hands-on labs including flows
  2. Generative AI on AWS

    • URL: https://aws.amazon.com/generative-ai/
    • Learning paths and resources
  3. AWS Samples - Bedrock Flows

    git clone https://github.com/aws-samples/amazon-bedrock-samples.git
    cd amazon-bedrock-samples/agents-and-function-calling/bedrock-flows
    

Community Resources

AWS re:Post (Community Forum): - URL: https://repost.aws/tags/TA4kkYBfVxQ_2R5Xt8jXZDdQ/amazon-bedrock - Ask questions and share knowledge

AWS Blog Posts: - Search: "Amazon Bedrock Flows" on https://aws.amazon.com/blogs/

Sample Flow Templates

1. Customer Support Flow Template:

# Available in AWS Samples repository
# Path: amazon-bedrock-samples/agents-and-function-calling/bedrock-flows/customer-support-flow.json

2. Document Processing Flow Template:

# Available in AWS Samples repository
# Path: amazon-bedrock-samples/agents-and-function-calling/bedrock-flows/document-processing-flow.json

3. Content Generation Flow Template:

# Available in AWS Samples repository
# Path: amazon-bedrock-samples/agents-and-function-calling/bedrock-flows/content-generation-flow.json

Quick Start with AWS CLI

# List available flows
aws bedrock-agent list-flows --region us-east-1

# Get flow details
aws bedrock-agent get-flow --flow-identifier YOUR_FLOW_ID --region us-east-1

# Prepare a flow
aws bedrock-agent prepare-flow --flow-identifier YOUR_FLOW_ID --region us-east-1

# Create flow version
aws bedrock-agent create-flow-version \
    --flow-identifier YOUR_FLOW_ID \
    --description "Production version" \
    --region us-east-1

# Invoke a flow
aws bedrock-agent-runtime invoke-flow \
    --flow-identifier YOUR_FLOW_ID \
    --flow-alias-identifier YOUR_ALIAS_ID \
    --inputs '[{"content":{"document":"Your input"},"nodeName":"FlowInput","nodeOutputName":"document"}]' \
    --region us-east-1

CloudFormation Templates

Deploy flows using Infrastructure as Code:

# Available in AWS Samples
# Path: amazon-bedrock-samples/cloudformation/bedrock-flows-template.yaml

AWSTemplateFormatVersion: '2010-09-09'
Description: 'Bedrock Flow Infrastructure'

Resources:
  BedrockFlowRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: bedrock.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonBedrockFullAccess

Terraform Examples

# Available in community repositories
# Search: "terraform aws bedrock flows"

resource "aws_bedrockagent_flow" "example" {
  name               = "example-flow"
  execution_role_arn = aws_iam_role.bedrock_flow.arn

  definition {
    # Flow definition JSON
  }
}

Troubleshooting

Common Issues and Solutions

Issue 1: Flow Creation Fails

Error: ValidationException: Invalid flow definition

Solution:

# Validate your flow definition structure
def validate_flow_definition(definition):
    """
    Validate flow definition before creation
    """
    required_keys = ['nodes', 'connections']

    for key in required_keys:
        if key not in definition:
            raise ValueError(f"Missing required key: {key}")

    # Check nodes
    if not definition['nodes']:
        raise ValueError("Flow must have at least one node")

    # Check for Input and Output nodes
    node_types = [node['type'] for node in definition['nodes']]
    if 'Input' not in node_types:
        raise ValueError("Flow must have an Input node")
    if 'Output' not in node_types:
        raise ValueError("Flow must have an Output node")

    print("✓ Flow definition is valid")
    return True

# Use before creating flow
validate_flow_definition(flow_definition)

Issue 2: Flow Invocation Fails

Error: ResourceNotFoundException: Flow not found

Solution:

# Ensure flow is prepared before invoking
def safe_invoke_flow(flow_id, alias_id, inputs):
    """
    Safely invoke a flow with error handling
    """
    try:
        # Check if flow exists
        flow = bedrock_agent.get_flow(flowIdentifier=flow_id)
        print(f"Flow found: {flow['name']}")

        # Check if flow is prepared
        if flow['status'] != 'Prepared':
            print("Flow not prepared. Preparing now...")
            bedrock_agent.prepare_flow(flowIdentifier=flow_id)

            # Wait for preparation
            import time
            time.sleep(5)

        # Invoke flow
        result = bedrock_agent_runtime.invoke_flow(
            flowIdentifier=flow_id,
            flowAliasIdentifier=alias_id,
            inputs=inputs
        )

        return result

    except bedrock_agent.exceptions.ResourceNotFoundException:
        print(f"Error: Flow {flow_id} not found")
        return None
    except Exception as e:
        print(f"Error invoking flow: {e}")
        return None

Issue 3: Prompt Variables Not Replaced

Error: Variables like {{variable}} appear in output

Solution:

# Ensure inputs match prompt variable names exactly
prompt_template = "Summarize this: {{document}}"

# ✅ Correct - variable name matches
inputs = {
    'FlowInput': {
        'document': 'Your text here'  # Matches {{document}}
    }
}

# ❌ Wrong - variable name doesn't match
inputs = {
    'FlowInput': {
        'text': 'Your text here'  # Doesn't match {{document}}
    }
}

Issue 4: Flow Execution Timeout

Error: Flow takes too long to execute

Solution:

# Optimize your flow
optimization_tips = {
    "reduce_max_tokens": "Set appropriate maxTokens for each prompt",
    "parallel_execution": "Use parallel branches where possible",
    "cache_results": "Cache intermediate results",
    "simplify_prompts": "Make prompts more concise"
}

# Example: Reduce max tokens
inference_config = {
    "text": {
        "maxTokens": 500,  # Instead of 4000
        "temperature": 0.7
    }
}

Issue 5: Permission Denied

Error: AccessDeniedException

Solution:

# Ensure IAM role has correct permissions
iam_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "bedrock:InvokeModel",
                "bedrock:InvokeModelWithResponseStream"
            ],
            "Resource": "arn:aws:bedrock:*::foundation-model/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "bedrock:CreateFlow",
                "bedrock:UpdateFlow",
                "bedrock:GetFlow",
                "bedrock:InvokeFlow"
            ],
            "Resource": "*"
        }
    ]
}

Debugging Tips

Enable Trace Logging

def invoke_with_trace(flow_id, alias_id, inputs):
    """
    Invoke flow with detailed trace logging
    """
    response = bedrock_agent_runtime.invoke_flow(
        flowIdentifier=flow_id,
        flowAliasIdentifier=alias_id,
        inputs=inputs
    )

    for event in response['responseStream']:
        # Log trace events
        if 'flowTraceEvent' in event:
            trace = event['flowTraceEvent']
            print(f"\n[TRACE] Node: {trace.get('nodeName')}")
            print(f"[TRACE] Type: {trace.get('nodeType')}")
            print(f"[TRACE] Timestamp: {trace.get('timestamp')}")

            if 'trace' in trace:
                print(f"[TRACE] Details: {json.dumps(trace['trace'], indent=2)}")

        # Log output events
        elif 'flowOutputEvent' in event:
            output = event['flowOutputEvent']
            print(f"\n[OUTPUT] Node: {output.get('nodeName')}")
            print(f"[OUTPUT] Content: {output.get('content')}")

Test Individual Nodes

def test_prompt_node(prompt_template, test_inputs):
    """
    Test a prompt node independently
    """
    # Replace variables manually
    prompt = prompt_template
    for key, value in test_inputs.items():
        prompt = prompt.replace(f"{{{{{key}}}}}", str(value))

    print(f"Rendered prompt:\n{prompt}\n")

    # Test with model directly
    response = bedrock_runtime.converse(
        modelId="anthropic.claude-3-sonnet-20240229-v1:0",
        messages=[
            {"role": "user", "content": [{"text": prompt}]}
        ]
    )

    return response['output']['message']['content'][0]['text']

# Test before adding to flow
result = test_prompt_node(
    "Summarize: {{document}}",
    {"document": "Test document text"}
)
print(f"Result: {result}")

Getting Help

AWS Support: - Open a support case in AWS Console - Include flow ID, error messages, and trace logs

Community: - AWS re:Post: https://repost.aws/tags/TA4kkYBfVxQ_2R5Xt8jXZDdQ/amazon-bedrock - Stack Overflow: Tag amazon-bedrock

Documentation: - Bedrock Flows Guide: https://docs.aws.amazon.com/bedrock/latest/userguide/flows.html - API Reference: https://docs.aws.amazon.com/bedrock/latest/APIReference/


Conclusion

AWS Bedrock Flows provide a powerful way to build complex generative AI workflows without extensive coding. By chaining together prompts, models, and logic, you can create sophisticated applications that handle multi-step reasoning, data processing, and content generation.

Key Takeaways: - Flows are visual, reusable workflows for AI tasks - Use nodes to define steps and connections to define flow - Version and alias flows for safe deployments - Test thoroughly before production - Monitor and optimize for cost and performance

Next Steps: 1. Try the simple examples in this guide 2. Explore AWS code samples repository 3. Build your first production flow 4. Join the AWS community for support

For the latest updates and features, always refer to the official AWS Bedrock documentation.