Loading...
Loading...
Amazon Bedrock AgentCore Policy for defining agent boundaries using natural language and Cedar. Deterministic policy enforcement at the Gateway level. Use when setting agent guardrails, access control, tool permissions, or compliance rules.
npx skill4agent add adaptationio/skrillz bedrock-agentcore-policy"Allow all users to read policy details and claim status.
Only allow users with 'senior-adjuster' role to update coverage.
Block all claim filings unless a description is provided."import boto3
control = boto3.client('bedrock-agentcore-control')
# Start policy generation from natural language
response = control.start_policy_generation(
gatewayId='gateway-xxx',
naturalLanguagePolicy="""
Allow all users to get policy and get claim status.
Only allow principals with the 'senior-adjuster' role to update coverage.
Block principals from filing claims unless description is provided.
""",
policyName='insurance-agent-policy'
)
generation_id = response['policyGenerationId']
# Wait for completion
waiter = control.get_waiter('PolicyGenerationCompleted')
waiter.wait(policyGenerationId=generation_id)
# Get generated Cedar
result = control.get_policy_generation(
policyGenerationId=generation_id
)
cedar_policy = result['generatedPolicy']
validation_results = result['validationResults']// Permit read-only actions for everyone
permit(
principal,
action in [
AgentCore::Action::"InsuranceAPI__get_policy",
AgentCore::Action::"InsuranceAPI__get_claim_status"
],
resource
);
// Permit updates only for specific roles
permit(
principal,
action == AgentCore::Action::"InsuranceAPI__update_coverage",
resource
)
when {
principal.hasTag("role") &&
principal.getTag("role") == "senior-adjuster"
};
// Block claims without description
forbid(
principal,
action == AgentCore::Action::"InsuranceAPI__file_claim",
resource
)
unless {
context.input has description
};// Basic permit
permit(
principal,
action == AgentCore::Action::"ToolName__method",
resource == AgentCore::Gateway::"arn:..."
);
// With conditions
permit(
principal is AgentCore::OAuthUser,
action == AgentCore::Action::"RefundAPI__process_refund",
resource
)
when {
context.input.amount < 1000
};
// Forbid with unless
forbid(
principal,
action == AgentCore::Action::"DeleteAPI__delete_record",
resource
)
unless {
principal.hasTag("role") &&
principal.getTag("role") == "admin"
};response = control.create_policy(
name='refund-limit-policy',
description='Limits refunds to under $1000 for non-managers',
policyContent='''
permit(
principal,
action == AgentCore::Action::"RefundToolTarget___refund",
resource == AgentCore::Gateway::"arn:aws:bedrock-agentcore:us-east-1:123456789012:gateway/refund"
)
when {
context.input.amount < 1000
};
permit(
principal,
action == AgentCore::Action::"RefundToolTarget___refund",
resource == AgentCore::Gateway::"arn:aws:bedrock-agentcore:us-east-1:123456789012:gateway/refund"
)
when {
principal.hasTag("role") &&
principal.getTag("role") == "manager"
};
'''
)
policy_id = response['policyId']// Admin-only actions
permit(
principal,
action in [
AgentCore::Action::"AdminAPI__delete_user",
AgentCore::Action::"AdminAPI__modify_permissions"
],
resource
)
when {
principal.hasTag("role") &&
principal.getTag("role") == "admin"
};// Require specific scope
permit(
principal is AgentCore::OAuthUser,
action == AgentCore::Action::"CustomerAPI__read_profile",
resource
)
when {
principal.hasTag("scope") &&
principal.getTag("scope") like "*customer:read*"
};// Limit by parameter value
permit(
principal,
action == AgentCore::Action::"TransferAPI__transfer_funds",
resource
)
when {
context.input has amount &&
context.input.amount <= 10000
};// All conditions must be true
permit(
principal,
action == AgentCore::Action::"InsuranceAPI__update_coverage",
resource
)
when {
context.input has coverageType &&
context.input has newLimit &&
(context.input.coverageType == "liability" ||
context.input.coverageType == "collision")
};// Temporarily disable a tool
forbid(
principal,
action == AgentCore::Action::"ProblematicAPI__buggy_method",
resource
);// Grant to specific user
permit(
principal,
action == AgentCore::Action::"SpecialAPI__sensitive_action",
resource
)
when {
principal.hasTag("username") &&
principal.getTag("username") == "trusted-user"
};# Create policy engine to evaluate policies
response = control.create_policy_engine(
name='insurance-policy-engine',
description='Enforces insurance agent boundaries',
gatewayId='gateway-xxx'
)
engine_id = response['policyEngineId']
# Wait for active
waiter = control.get_waiter('PolicyEngineActive')
waiter.wait(policyEngineId=engine_id)# Update policy engine with policies
response = control.update_policy_engine(
policyEngineId=engine_id,
policyIds=[
'policy-read-access',
'policy-role-restrictions',
'policy-refund-limits'
]
)# Invoke agent and observe policy enforcement
client = boto3.client('bedrock-agentcore')
response = client.invoke_agent_runtime(
agentRuntimeArn='arn:...',
runtimeSessionId='test-session',
payload={
'prompt': 'Process a refund of $50000',
'context': {
'user_id': 'regular-user',
'role': 'customer-service' # Not manager
}
}
)
# Policy will block this - amount exceeds $1000 for non-managers
# Agent response will indicate the action was denied# Get policy validation results
response = control.get_policy_generation(
policyGenerationId=generation_id
)
for issue in response.get('validationResults', {}).get('issues', []):
print(f"Issue: {issue['type']}")
print(f"Message: {issue['message']}")
print(f"Location: {issue.get('location', 'N/A')}")
# Common issues:
# - Overly permissive (allows more than intended)
# - Overly restrictive (blocks legitimate actions)
# - Unsatisfiable conditions (can never match)
# - Schema mismatch (references non-existent tools)# Enable detailed logging
import logging
logging.getLogger('botocore').setLevel(logging.DEBUG)
# Check CloudWatch for policy decisions
# Log group: /aws/bedrock-agentcore/gateway/{gateway-id}
# Look for: PolicyDecision events
# Example log entry:
# {
# "eventType": "PolicyDecision",
# "action": "InsuranceAPI__file_claim",
# "decision": "DENY",
# "matchedPolicy": "policy-require-description",
# "reason": "Condition not satisfied: context.input has description"
# }def test_policy_scenarios():
"""Test various policy scenarios"""
test_cases = [
{
'name': 'Regular user reads policy',
'action': 'get_policy',
'context': {'role': 'user'},
'expected': 'ALLOW'
},
{
'name': 'Regular user updates coverage',
'action': 'update_coverage',
'context': {'role': 'user'},
'expected': 'DENY'
},
{
'name': 'Senior adjuster updates coverage',
'action': 'update_coverage',
'context': {'role': 'senior-adjuster'},
'expected': 'ALLOW'
},
{
'name': 'Claim without description',
'action': 'file_claim',
'context': {'role': 'user'},
'input': {'amount': 100}, # No description
'expected': 'DENY'
},
{
'name': 'Claim with description',
'action': 'file_claim',
'context': {'role': 'user'},
'input': {'amount': 100, 'description': 'Car accident'},
'expected': 'ALLOW'
}
]
for case in test_cases:
result = simulate_policy(case)
assert result == case['expected'], f"Failed: {case['name']}"principal // Any principal
principal is AgentCore::OAuthUser // OAuth authenticated user
principal is AgentCore::ApiKeyUser // API key authenticatedaction == AgentCore::Action::"ToolName__method"
action in [Action1, Action2, Action3]// Tag checks
principal.hasTag("role")
principal.getTag("role") == "admin"
principal.getTag("scope") like "*read*"
// Context/input checks
context.input has fieldName
context.input.amount < 1000
context.input.type == "premium"
// Logical operators
&& // AND
|| // OR
! // NOTpermit(...) // Allow if conditions match
permit(...) when {} // Allow with conditions
forbid(...) // Deny unconditionally
forbid(...) unless {} // Deny unless conditions match// Phase 1: Allow all, log actions
permit(principal, action, resource);
// Phase 2: After analysis, add restrictions
permit(principal, action, resource)
when { /* specific conditions */ };control.create_policy(
name='refund-limit-1000-non-managers', # Good
# name='policy-1', # Bad
...
)// Business Rule: PCI-DSS compliance requires
// credit card operations to be role-restricted
permit(
principal,
action == AgentCore::Action::"PaymentAPI__process_card",
resource
)
when {
principal.hasTag("role") &&
principal.getTag("role") in ["payment-admin", "finance"]
};Policy Stack:
1. Global deny (default)
2. Read-only permits (broad)
3. Write permits (role-specific)
4. Admin permits (highly restricted)
5. Emergency forbids (immediate disable){
"mcpServers": {
"bedrock-agentcore-policy": {
"command": "uvx",
"args": ["bedrock-agentcore-policy-mcp"],
"env": {
"AWS_REGION": "us-east-1"
}
}
}
}references/cedar-syntax.mdreferences/policy-patterns.mdreferences/troubleshooting.md