Examples

This section contains complete examples demonstrating how to use the Community DevOps Agent API.

Basic Usage

  1#!/usr/bin/env python3
  2"""
  3Basic usage examples for the Community DevOps Agent API.
  4
  5This script demonstrates fundamental usage patterns for the boto3 integration,
  6including service registration, client creation, and basic operations.
  7"""
  8
  9import boto3
 10import devopsagent_api
 11from devopsagent_api.exceptions import DevOpsAgentError
 12from config import get_config
 13
 14
 15def main():
 16    """Demonstrate basic usage of the DevOps Agent API."""
 17
 18    print("πŸ€– Community DevOps Agent API - Basic Usage Examples")
 19    print("=" * 60)
 20
 21    # Get configuration
 22    config = get_config()
 23    config.print_configuration()
 24
 25    # Check service registration status
 26    print("\n1. Service Registration Status")
 27    print("-" * 30)
 28    if devopsagent_api.is_service_registered():
 29        print("βœ… Service successfully registered with boto3")
 30    else:
 31        print("❌ Service registration failed")
 32        print("   This may be due to missing dependencies or configuration issues")
 33        return
 34
 35    # Create a client
 36    print("\n2. Client Creation")
 37    print("-" * 30)
 38    try:
 39        # Basic client creation using config
 40        client = config.get_client()
 41        print("βœ… Client created successfully")
 42        print(f"   Service name: {client.meta.service_model.service_name}")
 43        print(f"   API version: {client.meta.service_model.api_version}")
 44        print(f"   Endpoint: {client._endpoint.host}")
 45
 46    except Exception as e:
 47        print(f"❌ Failed to create client: {e}")
 48        return
 49
 50    # Demonstrate available operations
 51    print("\n3. Available Operations")
 52    print("-" * 30)
 53    operations_by_category = config.get_operations_by_category()
 54    total_ops = sum(len(ops) for ops in operations_by_category.values())
 55    print(f"Total operations available: {total_ops}")
 56
 57    for category, operations in operations_by_category.items():
 58        print(f"\n{category}:")
 59        for op in operations:
 60            print(f"  β€’ {op}")
 61
 62    # Demonstrate paginators
 63    print("\n4. Available Paginators")
 64    print("-" * 30)
 65    try:
 66        available_paginators = config.get_available_paginators()
 67        print("Automatic pagination available for:")
 68        for name in available_paginators:
 69            try:
 70                paginator = client.get_paginator(name)
 71                print(f"  β€’ {name} (page size: 50-1000)")
 72            except Exception:
 73                print(f"  β€’ {name} (not available)")
 74    except Exception as e:
 75        print(f"❌ Error checking paginators: {e}")
 76
 77    # Demonstrate waiters
 78    print("\n5. Available Waiters")
 79    print("-" * 30)
 80    try:
 81        available_waiters = config.get_available_waiters()
 82        print("Long-running operation waiters:")
 83        for name in available_waiters:
 84            try:
 85                waiter = client.get_waiter(name)
 86                print(f"  β€’ {name} (polling with backoff)")
 87            except Exception:
 88                print(f"  β€’ {name} (not available)")
 89    except Exception as e:
 90        print(f"❌ Error checking waiters: {e}")
 91
 92    # Demonstrate configuration options
 93    print("\n6. Client Configuration Options")
 94    print("-" * 30)
 95    from botocore.config import Config
 96
 97    # Example configurations
 98    configs = [
 99        ("Default", {}),
100        ("With retries", {"retries": {"max_attempts": 5}}),
101        ("With timeouts", {"connect_timeout": 10, "read_timeout": 30}),
102        ("With retry mode", {"retries": {"mode": "adaptive"}}),
103    ]
104
105    for name, config_dict in configs:
106        try:
107            boto_config = Config(**config_dict)
108            test_client = boto3.client(
109                'community-devops-agent',
110                region_name=config.region,
111                config=boto_config
112            )
113            print(f"  β€’ {name}: βœ… Configured successfully")
114        except Exception as e:
115            print(f"  β€’ {name}: ❌ Configuration failed: {e}")
116
117    # Demonstrate session usage
118    print("\n7. Using boto3 Sessions")
119    print("-" * 30)
120    try:
121        # Create a session (useful for credential management)
122        session = boto3.Session(region_name=config.region)
123
124        # Create client from session
125        session_client = session.client('community-devops-agent')
126        print("βœ… Session-based client created successfully")
127        print(f"   Region: {session.region_name}")
128        print(f"   Profile: {session.profile_name or 'default'}")
129
130    except Exception as e:
131        print(f"❌ Session client creation failed: {e}")
132
133    # Error handling demonstration
134    print("\n8. Error Handling")
135    print("-" * 30)
136    print("The library provides comprehensive error handling:")
137    print("β€’ DevOpsAgentError: Base exception for API errors")
138    print("β€’ AuthenticationError: Credential and auth issues")
139    print("β€’ ValidationException: Invalid request parameters")
140    print("β€’ ResourceNotFoundException: Missing resources")
141    print("β€’ ThrottlingException: Rate limiting")
142    print("β€’ And more... (see devopsagent_api.exceptions)")
143
144    # Demonstrate import patterns
145    print("\n9. Import Patterns")
146    print("-" * 30)
147    print("Recommended import pattern:")
148    print("  import devopsagent_api  # Registers service")
149    print("  import boto3")
150    print("  client = boto3.client('community-devops-agent')")
151    print()
152    print("For type hints and models:")
153    print("  from devopsagent_api.models import Task, TaskStatus")
154    print("  from devopsagent_api.exceptions import DevOpsAgentError")
155
156    print("\nπŸŽ‰ Basic usage demonstration complete!")
157    print("\nNext steps:")
158    print("β€’ Run 'python examples/list_tasks.py' for task operations")
159    print("β€’ Check examples/README.md for detailed usage patterns")
160    print("β€’ See implementationPlan.md for development roadmap")
161
162
163if __name__ == "__main__":
164    main()

List Tasks

  1#!/usr/bin/env python3
  2"""
  3Task listing examples for the Community DevOps Agent API.
  4
  5This script demonstrates how to list and filter tasks using the boto3 integration,
  6including pagination, sorting, and various filter combinations.
  7"""
  8
  9import os
 10import sys
 11import boto3
 12import devopsagent_api
 13from devopsagent_api.models import TaskStatus, TaskType, TaskPriority, SortField, SortOrder
 14from devopsagent_api.exceptions import DevOpsAgentError
 15from datetime import datetime, timedelta
 16from config import get_config
 17
 18
 19def get_agent_space_id():
 20    """Get agent space ID from environment or prompt user."""
 21    agent_space_id = os.getenv('AGENT_SPACE_ID')
 22    if not agent_space_id:
 23        print("❌ AGENT_SPACE_ID environment variable not set")
 24        print("   Please set it with: export AGENT_SPACE_ID=your-agent-space-uuid")
 25        print("   Or run: AGENT_SPACE_ID=your-uuid python examples/list_tasks.py")
 26        sys.exit(1)
 27    return agent_space_id
 28
 29
 30def list_all_tasks(client, agent_space_id):
 31    """Demonstrate basic task listing without filters."""
 32    print("\nπŸ“‹ Basic Task Listing")
 33    print("-" * 30)
 34
 35    try:
 36        response = client.list_tasks(agentSpaceId=agent_space_id)
 37
 38        tasks = response.get('tasks', [])
 39        print(f"Found {len(tasks)} tasks")
 40
 41        if tasks:
 42            for i, task in enumerate(tasks[:5], 1):  # Show first 5
 43                print(f"{i}. {task['title'][:50]}... ({task['status']})")
 44            if len(tasks) > 5:
 45                print(f"   ... and {len(tasks) - 5} more tasks")
 46        else:
 47            print("   No tasks found in this agent space")
 48
 49        return tasks
 50
 51    except DevOpsAgentError as e:
 52        print(f"❌ API Error: {e}")
 53        return []
 54    except Exception as e:
 55        print(f"❌ Unexpected error: {e}")
 56        return []
 57
 58
 59def list_tasks_with_filters(client, agent_space_id):
 60    """Demonstrate task listing with various filters."""
 61    print("\nπŸ” Task Listing with Filters")
 62    print("-" * 35)
 63
 64    # Filter for completed investigation tasks
 65    print("\n1. Completed Investigation Tasks")
 66    try:
 67        response = client.list_tasks(
 68            agentSpaceId=agent_space_id,
 69            filter={
 70                'taskType': ['INVESTIGATION'],
 71                'status': ['COMPLETED']
 72            },
 73            limit=10
 74        )
 75        tasks = response.get('tasks', [])
 76        print(f"   Found {len(tasks)} completed investigation tasks")
 77        for task in tasks:
 78            print(f"   β€’ {task['title'][:40]}... (Priority: {task['priority']})")
 79
 80    except DevOpsAgentError as e:
 81        print(f"   ❌ Filter error: {e}")
 82
 83    # Filter for high-priority tasks
 84    print("\n2. High Priority Tasks")
 85    try:
 86        response = client.list_tasks(
 87            agentSpaceId=agent_space_id,
 88            filter={
 89                'priority': ['HIGH']  # Single item list
 90            },
 91            limit=5
 92        )
 93        tasks = response.get('tasks', [])
 94        print(f"   Found {len(tasks)} high-priority tasks")
 95        for task in tasks:
 96            print(f"   β€’ {task['title'][:40]}... ({task['priority']}, {task['status']})")
 97
 98    except DevOpsAgentError as e:
 99        print(f"   ❌ Filter error: {e}")
100
101    # Filter by date range (last 7 days)
102    print("\n3. Recently Created Tasks (Last 7 Days)")
103    try:
104        seven_days_ago = (datetime.utcnow() - timedelta(days=7)).isoformat() + 'Z'
105        response = client.list_tasks(
106            agentSpaceId=agent_space_id,
107            filter={
108                'createdAfter': seven_days_ago
109            },
110            sortField='CREATED_AT',
111            order='DESC'
112        )
113        tasks = response.get('tasks', [])
114        print(f"   Found {len(tasks)} tasks created in the last 7 days")
115        for task in tasks[:3]:
116            created = datetime.fromisoformat(task['createdAt'].replace('Z', '+00:00'))
117            print(f"   β€’ {task['title'][:35]}... (Created: {created.strftime('%Y-%m-%d %H:%M')})")
118
119    except DevOpsAgentError as e:
120        print(f"   ❌ Date filter error: {e}")
121
122
123def demonstrate_pagination(client, agent_space_id):
124    """Demonstrate automatic pagination using boto3 paginators."""
125    print("\nπŸ“„ Pagination Examples")
126    print("-" * 25)
127
128    # Manual pagination
129    print("\n1. Manual Pagination")
130    try:
131        page_size = 3
132        next_token = None
133        total_tasks = 0
134
135        while True:
136            response = client.list_tasks(
137                agentSpaceId=agent_space_id,
138                limit=page_size,
139                nextToken=next_token
140            )
141
142            tasks = response.get('tasks', [])
143            print(f"   Page with {len(tasks)} tasks (total so far: {total_tasks + len(tasks)})")
144
145            if tasks:
146                for task in tasks:
147                    print(f"     β€’ {task['title'][:30]}...")
148
149            total_tasks += len(tasks)
150            next_token = response.get('next_token')
151
152            if not next_token or total_tasks >= 9:  # Limit for demo
153                break
154
155        print(f"   Total tasks retrieved: {total_tasks}")
156
157    except DevOpsAgentError as e:
158        print(f"   ❌ Manual pagination error: {e}")
159
160    # Automatic pagination with boto3 paginator
161    print("\n2. Automatic Pagination (Paginator)")
162    try:
163        paginator = client.get_paginator('list_tasks')
164        page_iterator = paginator.paginate(
165            agentSpaceId=agent_space_id,
166            PaginationConfig={
167                'MaxItems': 10,  # Total items to retrieve
168                'PageSize': 2    # Items per page
169            }
170        )
171
172        total_retrieved = 0
173        page_count = 0
174
175        for page in page_iterator:
176            page_count += 1
177            tasks = page.get('tasks', [])
178            total_retrieved += len(tasks)
179            print(f"   Page {page_count}: {len(tasks)} tasks")
180
181        print(f"   Total tasks retrieved with paginator: {total_retrieved}")
182
183    except DevOpsAgentError as e:
184        print(f"   ❌ Paginator error: {e}")
185
186
187def demonstrate_sorting(client, agent_space_id):
188    """Demonstrate different sorting options."""
189    print("\nπŸ”€ Sorting Examples")
190    print("-" * 20)
191
192    sort_options = [
193        ('CREATED_AT', 'DESC', 'Newest First'),
194        ('CREATED_AT', 'ASC', 'Oldest First'),
195        ('UPDATED_AT', 'DESC', 'Recently Updated'),
196        ('PRIORITY', 'DESC', 'Highest Priority First'),
197    ]
198
199    for sort_field, order, description in sort_options:
200        try:
201            response = client.list_tasks(
202                agentSpaceId=agent_space_id,
203                sortField=sort_field,
204                order=order,
205                limit=3
206            )
207
208            tasks = response.get('tasks', [])
209            print(f"\n{sort_field} {order} ({description}):")
210            for i, task in enumerate(tasks, 1):
211                if sort_field == 'PRIORITY':
212                    key_value = task['priority']
213                elif sort_field == 'CREATED_AT':
214                    key_value = task['createdAt'][:10]  # Date part only
215                elif sort_field == 'UPDATED_AT':
216                    key_value = task['updatedAt'][:10]
217                else:
218                    key_value = "N/A"
219
220                print(f"   {i}. [{key_value}] {task['title'][:30]}...")
221
222        except DevOpsAgentError as e:
223            print(f"   ❌ Sorting error for {sort_field} {order}: {e}")
224
225
226def demonstrate_error_handling(client, agent_space_id):
227    """Demonstrate error handling patterns."""
228    print("\n🚨 Error Handling Examples")
229    print("-" * 28)
230
231    # Invalid agent space ID
232    print("\n1. Invalid Agent Space ID")
233    try:
234        response = client.list_tasks(agentSpaceId="invalid-uuid")
235        print("   ❌ Should have failed with invalid UUID")
236    except DevOpsAgentError as e:
237        print(f"   βœ… Correctly caught error: {type(e).__name__}")
238        print(f"      Message: {str(e)[:60]}...")
239
240    # Invalid filter values
241    print("\n2. Invalid Filter Values")
242    try:
243        response = client.list_tasks(
244            agentSpaceId=agent_space_id,
245            filter={
246                'status': ['INVALID_STATUS']
247            }
248        )
249        print("   ❌ Should have failed with invalid status")
250    except DevOpsAgentError as e:
251        print(f"   βœ… Correctly caught validation error: {type(e).__name__}")
252
253    # Network/permission errors would be caught here in real usage
254    print("\n3. Network/Permission Error Simulation")
255    print("   (In real usage, these would be caught automatically)")
256    print("   β€’ ThrottlingException: Rate limiting")
257    print("   β€’ UnauthorizedException: Invalid credentials")
258    print("   β€’ ResourceNotFoundException: Agent space not found")
259
260
261def performance_comparison(client, agent_space_id):
262    """Compare performance of different querying approaches."""
263    print("\n⚑ Performance Comparison")
264    print("-" * 27)
265
266    import time
267
268    # Get all tasks first
269    try:
270        start_time = time.time()
271        all_response = client.list_tasks(agentSpaceId=agent_space_id)
272        all_tasks = all_response.get('tasks', [])
273        fetch_time = time.time() - start_time
274
275        print(f"   Fetch all tasks: {fetch_time:.2f}s ({len(all_tasks)} tasks)")
276
277        # Simulate client-side filtering
278        start_time = time.time()
279        completed_tasks = [t for t in all_tasks if t['status'] == 'COMPLETED']
280        client_filter_time = time.time() - start_time
281
282        print(f"   Client-side filter: {client_filter_time:.2f}s ({len(completed_tasks)} tasks)")
283
284        # Server-side filtering
285        start_time = time.time()
286        server_response = client.list_tasks(
287            agentSpaceId=agent_space_id,
288            filter={'status': ['COMPLETED']}
289        )
290        server_tasks = server_response.get('tasks', [])
291        server_filter_time = time.time() - start_time
292
293        print(f"   Server-side filter: {server_filter_time:.2f}s ({len(server_tasks)} tasks)")
294        print(f"   Results match: {len(completed_tasks) == len(server_tasks)}")
295
296    except DevOpsAgentError as e:
297        print(f"   ❌ Performance test error: {e}")
298
299
300def main():
301    """Main demonstration function."""
302    print("πŸ“‹ Community DevOps Agent API - Task Listing Examples")
303    print("=" * 65)
304
305    # Get configuration
306    config = get_config()
307    config.print_configuration()
308
309    # Check service registration
310    if not devopsagent_api.is_service_registered():
311        print("❌ Service registration failed")
312        return
313
314    # Get agent space ID
315    agent_space_id = config.agent_space_id or get_agent_space_id()
316    print(f"Using agent space: {agent_space_id[:8]}...")
317
318    # Create client with direct AWS SigV4 authentication
319    try:
320        # Create a session with explicit credentials
321        session = boto3.Session(
322            aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
323            aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
324            aws_session_token=os.getenv('AWS_SESSION_TOKEN'),
325            region_name=config.region
326        )
327
328        # Create boto3 client with explicit session for direct AWS SigV4 authentication
329        client = session.client('community-devops-agent')
330        print("βœ… Client created successfully with direct AWS SigV4 authentication")
331    except Exception as e:
332        print(f"❌ Failed to create client: {e}")
333        return
334
335    # Run demonstrations
336    list_all_tasks(client, agent_space_id)
337    list_tasks_with_filters(client, agent_space_id)
338    demonstrate_pagination(client, agent_space_id)
339    demonstrate_sorting(client, agent_space_id)
340    demonstrate_error_handling(client, agent_space_id)
341    performance_comparison(client, agent_space_id)
342
343    print("\nπŸŽ‰ Task listing demonstration complete!")
344    print("\nπŸ’‘ Key Takeaways:")
345    print("β€’ Use filters to reduce data transfer and improve performance")
346    print("β€’ Prefer server-side filtering over client-side when possible")
347    print("β€’ Use pagination for large result sets")
348    print("β€’ Sort by priority for urgent task identification")
349    print("β€’ Handle errors gracefully in production code")
350
351    print("\nπŸ“– Next: Run 'python examples/get_task.py' to learn about individual task operations")
352
353
354if __name__ == "__main__":
355    main()

Get Task

  1#!/usr/bin/env python3
  2"""
  3Individual task operations examples for the Community DevOps Agent API.
  4
  5This script demonstrates how to retrieve, monitor, and update individual tasks
  6using the boto3 integration, including waiters and status monitoring.
  7"""
  8
  9import os
 10import sys
 11import time
 12import boto3
 13import devopsagent_api
 14from devopsagent_api.models import TaskStatus, TaskPriority
 15from devopsagent_api.exceptions import DevOpsAgentError
 16from datetime import datetime
 17from config import get_config
 18
 19
 20def get_agent_space_id():
 21    """Get agent space ID from environment or prompt user."""
 22    agent_space_id = os.getenv('AGENT_SPACE_ID')
 23    if not agent_space_id:
 24        print("❌ AGENT_SPACE_ID environment variable not set")
 25        print("   Please set it with: export AGENT_SPACE_ID=your-agent-space-uuid")
 26        print("   Or run: AGENT_SPACE_ID=your-uuid python examples/get_task.py")
 27        sys.exit(1)
 28    return agent_space_id
 29
 30
 31def get_task_by_id(client, agent_space_id, task_id, config):
 32    """Retrieve a specific task by ID."""
 33    print(f"\nπŸ” Getting Task: {task_id[:8]}...")
 34    print("-" * 40)
 35
 36    try:
 37        response = client.get_task(
 38            agentSpaceId=agent_space_id,
 39            taskId=task_id
 40        )
 41
 42        task = response.get('task', {})
 43        if task:
 44            print("βœ… Task retrieved successfully")
 45            print(f"   Title: {task['title']}")
 46            print(f"   Status: {task['status']}")
 47            print(f"   Priority: {task['priority']}")
 48            print(f"   Type: {task['taskType']}")
 49            print(f"   Created: {task['createdAt'][:19].replace('T', ' ')}")
 50            print(f"   Updated: {task['updatedAt'][:19].replace('T', ' ')}")
 51
 52            # Show description (truncated)
 53            desc = task.get('description', '')
 54            if desc:
 55                print(f"   Description: {desc[:100]}{'...' if len(desc) > 100 else ''}")
 56
 57            # Show ALL available fields
 58            print("\nπŸ“Š Complete Task Data Fields:")
 59            print("-" * 40)
 60            for key, value in task.items():
 61                if key == 'description' and len(str(value)) > 100:
 62                    print(f"   {key}: {str(value)[:100]}...")
 63                else:
 64                    print(f"   {key}: {value}")
 65
 66            return task
 67        else:
 68            print("❌ Task not found or empty response")
 69            return None
 70
 71    except DevOpsAgentError as e:
 72        print(f"❌ Failed to get task: {e}")
 73        return None
 74
 75
 76def monitor_task_status(client, agent_space_id, task_id, config, max_checks=None):
 77    """Monitor a task's status changes over time."""
 78    print(f"\nπŸ‘€ Monitoring Task Status: {task_id[:8]}...")
 79    print("-" * 45)
 80
 81    if max_checks is None:
 82        max_checks = config.max_checks
 83
 84    previous_status = None
 85
 86    for check in range(max_checks):
 87        try:
 88            response = client.get_task(
 89                agentSpaceId=agent_space_id,
 90                taskId=task_id
 91            )
 92
 93            task = response.get('task', {})
 94            if task:
 95                current_status = task['status']
 96                updated_at = task['updatedAt'][:19].replace('T', ' ')
 97
 98                if current_status != previous_status:
 99                    print(f"   Check {check + 1}: Status = {current_status} (at {updated_at})")
100                    previous_status = current_status
101
102                    # Check if task is in a final state
103                    if current_status in ['COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT']:
104                        print(f"   βœ… Task reached final status: {current_status}")
105                        return task
106                else:
107                    print(f"   Check {check + 1}: Status unchanged = {current_status}")
108
109            else:
110                print(f"   Check {check + 1}: Task not found")
111                return None
112
113        except DevOpsAgentError as e:
114            print(f"   Check {check + 1}: Error - {e}")
115            if "ResourceNotFoundException" in str(e):
116                return None
117
118        # Wait before next check (except on last iteration)
119        if check < max_checks - 1:
120            print("   Waiting 2 seconds...")
121            time.sleep(2)
122
123    print(f"   ⏰ Monitoring complete after {max_checks} checks")
124    return task if 'task' in locals() else None
125
126
127def update_task_status(client, agent_space_id, task_id, new_status, reason="Updated via API example"):
128    """Update a task's status."""
129    print(f"\nπŸ“ Updating Task Status: {task_id[:8]}...")
130    print("-" * 42)
131
132    # Validate status
133    valid_statuses = [s.value for s in TaskStatus]
134    if new_status not in valid_statuses:
135        print(f"❌ Invalid status '{new_status}'. Valid options: {', '.join(valid_statuses)}")
136        return None
137
138    try:
139        response = client.update_task(
140            agentSpaceId=agent_space_id,
141            taskId=task_id,
142            taskStatus=new_status
143        )
144
145        updated_task = response.get('task', {})
146        if updated_task:
147            print("βœ… Task status updated successfully")
148            print(f"   New Status: {updated_task['status']}")
149            print(f"   Updated At: {updated_task['updatedAt'][:19].replace('T', ' ')}")
150            return updated_task
151        else:
152            print("❌ Update failed - no task data returned")
153            return None
154
155    except DevOpsAgentError as e:
156        print(f"❌ Failed to update task: {e}")
157        return None
158
159
160def initiate_mitigation_plan(client, agent_space_id, task_id, current_version, reason="HUMAN_APPROVE_MITIGATION"):
161    """
162    Initiate a mitigation plan for a task.
163
164    This function updates the task status to PENDING_START and sets the appropriate
165    metadata to trigger mitigation plan initiation, matching the behavior of the
166    provided cURL command.
167
168    Args:
169        client: boto3 client for community-devops-agent
170        agent_space_id: Agent space UUID
171        task_id: Task UUID
172        current_version: Current version of the task for optimistic concurrency
173        reason: Reason for mitigation initiation (default: HUMAN_APPROVE_MITIGATION)
174
175    Returns:
176        Updated task dict or None if failed
177    """
178    print(f"\nπŸ›‘οΈ  Initiating Mitigation Plan: {task_id[:8]}...")
179    print("-" * 50)
180
181    try:
182        response = client.update_task(
183            agentSpaceId=agent_space_id,
184            taskId=task_id,
185            taskStatus="PENDING_START",
186            currentVersion=current_version,
187            metadata={"queueingReason": reason}
188        )
189
190        updated_task = response.get('task', {})
191        if updated_task:
192            print("βœ… Mitigation plan initiated successfully")
193            print(f"   Task Status: {updated_task['status']}")
194            print(f"   Queueing Reason: {updated_task.get('metadata', {}).get('queueingReason', 'N/A')}")
195            print(f"   Updated At: {updated_task['updatedAt'][:19].replace('T', ' ')}")
196            return updated_task
197        else:
198            print("❌ Mitigation initiation failed - no task data returned")
199            return None
200
201    except DevOpsAgentError as e:
202        print(f"❌ Failed to initiate mitigation: {e}")
203        return None
204
205
206def demonstrate_waiters(client, agent_space_id, task_id):
207    """Demonstrate waiter functionality for task monitoring."""
208    print(f"\n⏳ Demonstrating Waiters for Task: {task_id[:8]}...")
209    print("-" * 50)
210
211    waiter_types = [
212        ('task_completed', 'Task Completion'),
213        ('task_failed', 'Task Failure'),
214        ('task_started', 'Task Started')
215    ]
216
217    for waiter_name, description in waiter_types:
218        print(f"\n{description} ({waiter_name}):")
219
220        try:
221            waiter = client.get_waiter(waiter_name)
222            print(f"   Waiter available: {waiter_name}")
223
224            # Show waiter configuration
225            config = waiter.config
226            print(f"   Max attempts: {config.get('max_attempts', 'N/A')}")
227            print(f"   Delay: {config.get('delay', 'N/A')} seconds")
228
229            # Note: We don't actually wait here as it would block the demo
230            # In real usage: waiter.wait(agent_space_id=agent_space_id, task_id=task_id)
231
232        except Exception as e:
233            print(f"   Waiter not available: {e}")
234
235
236def demonstrate_error_handling(client, agent_space_id):
237    """Demonstrate error handling for task operations."""
238    print("\n🚨 Error Handling Examples")
239    print("-" * 30)
240
241    # Try to get a non-existent task
242    print("\n1. Non-existent Task ID")
243    fake_task_id = "00000000-0000-0000-0000-000000000000"
244    try:
245        response = client.get_task(
246            agentSpaceId=agent_space_id,
247            taskId=fake_task_id
248        )
249        print("   ❌ Should have failed with ResourceNotFoundException")
250    except Exception as e:
251        print(f"   βœ… Correctly caught error: {type(e).__name__}")
252        if "ResourceNotFoundException" in str(e) or "not found" in str(e).lower():
253            print("   βœ… Error indicates task not found")
254
255    # Try to update with invalid status
256    print("\n2. Invalid Status Update")
257    # First get a real task ID to attempt update
258    try:
259        list_response = client.list_tasks(agentSpaceId=agent_space_id, limit=1)
260        tasks = list_response.get('tasks', [])
261        if tasks:
262            real_task_id = tasks[0]['taskId']
263            try:
264                client.update_task(
265                    agentSpaceId=agent_space_id,
266                    taskId=real_task_id,
267                    taskStatus="INVALID_STATUS"
268                )
269                print("   ❌ Should have failed with validation error")
270            except DevOpsAgentError as e:
271                print(f"   βœ… Correctly caught validation error: {type(e).__name__}")
272        else:
273            print("   ⚠️  No tasks available to test with")
274    except DevOpsAgentError as e:
275        print(f"   ❌ Unexpected error during setup: {e}")
276
277    # Invalid agent space ID
278    print("\n3. Invalid Agent Space ID")
279    try:
280        response = client.get_task(
281            agentSpaceId="invalid-uuid",
282            taskId=fake_task_id
283        )
284        print("   ❌ Should have failed with validation error")
285    except DevOpsAgentError as e:
286        print(f"   βœ… Correctly caught error: {type(e).__name__}")
287
288
289def interactive_task_selection(client, agent_space_id):
290    """Allow user to select a task interactively for demonstration."""
291    print("\n🎯 Task Selection for Demonstration")
292    print("-" * 38)
293
294    try:
295        # Get recent tasks
296        response = client.list_tasks(
297            agentSpaceId=agent_space_id,
298            limit=5,
299            sortField='CREATED_AT',
300            order='DESC'
301        )
302
303        tasks = response.get('tasks', [])
304        if not tasks:
305            print("❌ No tasks found in this agent space")
306            print("   Please create some tasks first, or check your agent space ID")
307            return None
308
309        print("Available tasks (recently created):")
310        for i, task in enumerate(tasks, 1):
311            created = datetime.fromisoformat(task['createdAt'].replace('Z', '+00:00'))
312            print(f"   {i}. {task['title'][:40]}... ({task['status']}) - {created.strftime('%Y-%m-%d %H:%M')}")
313
314        # For demo purposes, use the first task
315        selected_task = tasks[0]
316        print(f"\nπŸ“Œ Using task: {selected_task['title'][:40]}... (ID: {selected_task['taskId'][:8]}...)")
317        return selected_task['taskId']
318
319    except DevOpsAgentError as e:
320        print(f"❌ Failed to list tasks: {e}")
321        return None
322
323
324def main():
325    """Main demonstration function."""
326    print("πŸ” Community DevOps Agent API - Individual Task Operations")
327    print("=" * 65)
328
329    # Get configuration
330    config = get_config()
331    config.print_configuration()
332
333    # Check service registration
334    if not devopsagent_api.is_service_registered():
335        print("❌ Service registration failed")
336        return
337
338    # Get agent space ID
339    agent_space_id = config.agent_space_id or get_agent_space_id()
340    print(f"Using agent space: {agent_space_id[:8]}...")
341
342    # Create client
343    try:
344        client = config.get_client()
345        print("βœ… Client created successfully")
346    except Exception as e:
347        print(f"❌ Failed to create client: {e}")
348        return
349
350    # Select a task for demonstration
351    task_id = interactive_task_selection(client, agent_space_id)
352    if not task_id:
353        print("\n❌ Cannot continue without a task to work with")
354        return
355
356    # Demonstrate operations
357    task = get_task_by_id(client, agent_space_id, task_id, config)
358    if task:
359        # Only demonstrate status updates if task is not already completed
360        if task['status'] not in ['COMPLETED', 'FAILED', 'CANCELLED']:
361            # Note: In real usage, be careful about status updates
362            print("\n⚠️  Note: Status updates are commented out to avoid modifying real tasks")
363            print("   Uncomment the following lines to test status updates:")
364            print("   # update_task_status(client, agent_space_id, task_id, 'IN_PROGRESS')")
365            print("   # initiate_mitigation_plan(client, agent_space_id, task_id, task['version'])")
366        else:
367            print(f"\nπŸ“‹ Task is already in final status: {task['status']}")
368
369    # Demonstrate monitoring (safe to run)
370    monitor_task_status(client, agent_space_id, task_id, config, max_checks=3)
371
372    # Demonstrate waiters
373    demonstrate_waiters(client, agent_space_id, task_id)
374
375    # Demonstrate error handling
376    demonstrate_error_handling(client, agent_space_id)
377
378    print("\nπŸŽ‰ Individual task operations demonstration complete!")
379    print("\nπŸ’‘ Key Takeaways:")
380    print("β€’ Use GetTask to retrieve complete task information")
381    print("β€’ Monitor task progress with periodic status checks")
382    print("β€’ Use waiters for event-driven task completion waiting")
383    print("β€’ Update task status to reflect progress or resolution")
384    print("β€’ Handle ResourceNotFoundException for missing tasks")
385    print("β€’ Validate status values before updates")
386
387    print("\nπŸ“– Next Steps:")
388    print("β€’ Run 'python examples/query_journal.py' for execution tracking")
389    print("β€’ Run 'python examples/topology_discovery.py' for infrastructure discovery")
390    print("β€’ Check examples/README.md for more usage patterns")
391
392
393if __name__ == "__main__":
394    main()

Interactive Monitoring

Basic interactive monitoring:

  1#!/usr/bin/env python3
  2"""
  3Timed Interactive Agent Monitoring Demo
  4
  5This example demonstrates monitoring with timed intervention messages:
  61. Full JSON parsing of all record types
  72. Rich formatted output with all details
  83. Configurable verbosity levels
  94. Support for all record types (message, activity, observation, etc.)
 105. Timed intervention messages at specific intervals
 11
 12The script shows EVERYTHING - all records, parsed JSON, metadata, etc.
 13"""
 14
 15import devopsagent_api
 16import boto3
 17from botocore.exceptions import ClientError, WaiterError
 18import time
 19from datetime import datetime
 20import os
 21import json
 22from typing import Dict, List, Any, Optional, Tuple
 23from config import get_config
 24
 25
 26class TimedAgentMonitor:
 27    """Monitor an active agent investigation with timed intervention messages."""
 28
 29    def __init__(self, client, agent_space_id, user_id='interactive-user',
 30                 show_json=False, show_thinking=True, verbose=True):
 31        self.client = client
 32        self.agent_space_id = agent_space_id
 33        self.user_id = user_id
 34        self.seen_record_ids = set()
 35        self.record_count = 0
 36        self.message_count = 0
 37        self.interventions_sent = set()  # Track which interventions have been sent
 38
 39        # Display options
 40        self.show_json = show_json
 41        self.show_thinking = show_thinking
 42        self.verbose = verbose
 43
 44        # Record type emojis
 45        self.type_emojis = {
 46            'message': 'πŸ’¬',
 47            'activity': 'βš™οΈ',
 48            'observation': 'πŸ‘οΈ',
 49            'symptom': '⚠️',
 50            'finding': 'πŸ”',
 51            'plan': 'πŸ“‹',
 52            'investigation_summary': 'πŸ“Š'
 53        }
 54
 55    def create_task(self, title, description):
 56        """Create a new investigation task."""
 57        print("\n" + "="*80)
 58        print("CREATING INVESTIGATION TASK")
 59        print("="*80)
 60
 61        try:
 62            response = self.client.create_task(
 63                agentSpaceId=self.agent_space_id,
 64                title=title,
 65                description=description,
 66                taskType='INVESTIGATION',
 67                priority='HIGH'
 68            )
 69
 70            task = response['task']
 71            task_id = task['taskId']
 72
 73            print(f"βœ“ Task created successfully!")
 74            print(f"  Task ID: {task_id}")
 75            print(f"  Title: {title}")
 76            print(f"  Status: {task['status']}")
 77            print(f"  Priority: {task['priority']}")
 78
 79            return task_id
 80
 81        except ClientError as e:
 82            print(f"βœ— Error creating task: {e}")
 83            return None
 84
 85    def wait_for_task_start(self, task_id, timeout=60):
 86        """Wait for task to start using waiter."""
 87        print(f"\n{'─'*80}")
 88        print("WAITING FOR AGENT TO START")
 89        print(f"{'─'*80}")
 90
 91        try:
 92            waiter = self.client.get_waiter('task_started')
 93
 94            print(f"⏳ Waiting for task to start (timeout: {timeout}s)...")
 95
 96            waiter.wait(
 97                agentSpaceId=self.agent_space_id,
 98                taskId=task_id,
 99                WaiterConfig={
100                    'Delay': 2,
101                    'MaxAttempts': timeout // 2
102                }
103            )
104
105            print(f"βœ“ Agent has started working on the task!")
106            return True
107
108        except WaiterError as e:
109            print(f"βœ— Timeout waiting for task to start: {e}")
110            return False
111        except ClientError as e:
112            print(f"βœ— Error: {e}")
113            return False
114
115    def get_execution_id_from_chat(self, task_id):
116        """Get the execution ID by sending a chat message to the task."""
117        try:
118            print(f"  Getting execution ID by sending initial chat message to task {task_id}")
119
120            # Send an initial chat message to create/get the chat execution
121            # Note: executionId is not required for the first message
122            response = self.client.send_chat_message(
123                agentSpaceId=self.agent_space_id,
124                taskId=task_id,
125                userId=self.user_id
126            )
127
128            # Extract execution ID from response (should be at top level)
129            execution_id = response.get('executionId')
130            if execution_id:
131                print(f"  βœ“ Got execution ID from chat response: {execution_id}")
132                return execution_id
133
134            # Fallback: check if it's in the messages
135            messages = response.get('messages', [])
136            print(f"  ⚠️ No executionId at top level, checking messages: {messages}")
137
138            print(f"  βœ— No execution ID found in chat response")
139            print(f"    Full response: {response}")
140            return None
141
142        except ClientError as e:
143            print(f"βœ— Error sending initial chat message: {e}")
144            return None
145
146    def send_message(self, task_id, execution_id, message, intervention_id=None):
147        """Send a chat message to the agent."""
148        try:
149            # Debug: show the request parameters
150            print(f"\nπŸ” DEBUG - SendChatMessage Request:")
151            print(f"   agentSpaceId: {self.agent_space_id}")
152            print(f"   taskId: {task_id}")
153            print(f"   executionId: {execution_id}")
154            print(f"   content: {message}")
155            print(f"   userId: {self.user_id}")
156            print()
157
158            response = self.client.send_chat_message(
159                agentSpaceId=self.agent_space_id,
160                taskId=task_id,
161                executionId=execution_id,
162                content=message,
163                userId=self.user_id
164            )
165
166            intervention_label = f" (#{intervention_id + 1})" if intervention_id is not None else ""
167            print(f"\n{'β”Œ'+'─'*78+'┐'}")
168            print(f"β”‚ πŸ’¬ TIMED USER INTERVENTION{intervention_label}{' '*(58-len(intervention_label))}β”‚")
169            print(f"β”‚{' '*78}β”‚")
170            print(f"β”‚ Message: {message[:64]:<64}β”‚")
171            if len(message) > 64:
172                print(f"β”‚          {message[64:]:<64}β”‚")
173            print(f"β”‚ Message ID: {response['messages'][0]['messageId']:<59}β”‚")
174            print(f"β””{'─'*78}β”˜\n")
175
176            if intervention_id is not None:
177                self.interventions_sent.add(intervention_id)
178            return True
179
180        except ClientError as e:
181            print(f"\nβœ— Error sending message: {e}\n")
182            return False
183
184    def fetch_new_records(self, execution_id):
185        """Fetch new journal records (all types)."""
186        try:
187            response = self.client.get_journal_records(
188                agentSpaceId=self.agent_space_id,
189                executionId=execution_id,
190                order='ASC',
191                limit=100
192            )
193
194            records = response.get('records', [])
195
196            # Filter out records we've already seen
197            new_records = [
198                r for r in records
199                if r['recordId'] not in self.seen_record_ids
200            ]
201
202            # Mark as seen
203            for record in new_records:
204                self.seen_record_ids.add(record['recordId'])
205
206            return new_records
207
208        except ClientError as e:
209            # Silently handle errors during polling
210            return []
211
212    def parse_message_content(self, content_str: str) -> Optional[Dict]:
213        """Parse JSON message content."""
214        try:
215            return json.loads(content_str)
216        except (json.JSONDecodeError, TypeError):
217            return None
218
219    def extract_text_from_content(self, content_data: Dict) -> List[str]:
220        """Extract readable text from parsed message content."""
221        texts = []
222
223        if not isinstance(content_data, dict):
224            return texts
225
226        # Extract from content array
227        content_array = content_data.get('content', [])
228        if isinstance(content_array, list):
229            for item in content_array:
230                if isinstance(item, dict):
231                    # Text content
232                    if 'text' in item:
233                        texts.append(('text', item['text']))
234
235                    # Thinking content
236                    if 'thinking' in item and self.show_thinking:
237                        texts.append(('thinking', item['thinking']))
238
239                    # Tool use
240                    if 'toolUse' in item:
241                        tool_use = item['toolUse']
242                        tool_name = tool_use.get('name', 'unknown')
243                        texts.append(('tool_use', f"Tool: {tool_name}"))
244
245                    # Tool result
246                    if 'toolResult' in item:
247                        tool_result = item['toolResult']
248                        tool_id = tool_result.get('toolUseId', 'unknown')
249                        texts.append(('tool_result', f"Tool Result: {tool_id}"))
250
251        return texts
252
253    def format_timestamp(self, timestamp: str) -> str:
254        """Format timestamp for display."""
255        if not timestamp or timestamp == 'N/A':
256            return 'N/A'
257
258        try:
259            dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
260            return dt.strftime('%H:%M:%S')
261        except:
262            return timestamp
263
264    def display_record(self, record: Dict):
265        """Display a journal record with full parsing."""
266        self.record_count += 1
267
268        record_id = record.get('recordId', 'unknown')
269        record_type = record.get('recordType', 'unknown')
270        timestamp = self.format_timestamp(record.get('timestamp', 'N/A'))
271        content = record.get('content', '')
272
273        emoji = self.type_emojis.get(record_type, 'πŸ“„')
274
275        # Header
276        print(f"\n{'β”Œ'+'─'*78+'┐'}")
277        print(f"β”‚ {emoji} RECORD #{self.record_count} - {record_type.upper():<60} β”‚")
278        print(f"β”‚ Time: {timestamp:<71} β”‚")
279        print(f"β”‚ ID: {record_id[:72]:<72} β”‚")
280        print(f"β”œ{'─'*78}─")
281
282        # Parse content if it's a message
283        if record_type == 'message':
284            self.message_count += 1
285            parsed = self.parse_message_content(content)
286
287            if parsed:
288                # Show role
289                role = parsed.get('role', 'unknown')
290                print(f"β”‚ Role: {role:<72} β”‚")
291
292                # Extract and display text content
293                texts = self.extract_text_from_content(parsed)
294
295                if texts:
296                    print(f"β”œ{'─'*78}─")
297                    for content_type, text in texts:
298                        # Format content type
299                        type_label = content_type.replace('_', ' ').title()
300                        print(f"β”‚ [{type_label}]:{' '*(70-len(type_label))} β”‚")
301
302                        # Word wrap text
303                        words = text.split()
304                        line = ""
305                        for word in words:
306                            if len(line) + len(word) + 1 <= 74:
307                                line += (word + " ")
308                            else:
309                                print(f"β”‚ {line:<76} β”‚")
310                                line = word + " "
311                        if line:
312                            print(f"β”‚ {line:<76} β”‚")
313
314                # Show raw JSON if requested
315                if self.show_json:
316                    print(f"β”œ{'─'*78}─")
317                    print(f"β”‚ [Raw JSON]:{' '*66} β”‚")
318                    json_str = json.dumps(parsed, indent=2)
319                    for line in json_str.split('\n')[:10]:  # Limit to 10 lines
320                        print(f"β”‚ {line[:76]:<76} β”‚")
321                    if len(json_str.split('\n')) > 10:
322                        print(f"β”‚ ... (truncated){' '*62} β”‚")
323            else:
324                # Show raw content if not parseable
325                print(f"β”‚ Content: {content[:68]:<68} β”‚")
326                if len(content) > 68:
327                    print(f"β”‚ ... (truncated){' '*62} β”‚")
328        else:
329            # Non-message records
330            print(f"β”‚ Content: {content[:68]:<68} β”‚")
331            if len(content) > 68:
332                remaining = content[68:]
333                while remaining:
334                    chunk = remaining[:76]
335                    print(f"β”‚ {chunk:<76} β”‚")
336                    remaining = remaining[76:]
337
338        print(f"β””{'─'*78}β”˜")
339
340    def check_task_status(self, task_id):
341        """Check if task is completed."""
342        try:
343            response = self.client.get_task(
344                agentSpaceId=self.agent_space_id,
345                taskId=task_id
346            )
347
348            task = response['task']
349            status = task['status']
350
351            return status in ['COMPLETED', 'FAILED', 'CANCELED', 'CANCELLED']
352
353        except ClientError:
354            return False
355
356    def monitor_task(self, task_id, execution_id, timed_interventions: List[Tuple[float, str]],
357                     poll_interval=2, max_duration=300):
358        """Monitor task execution with timed intervention messages."""
359        print(f"\n{'='*80}")
360        print("MONITORING AGENT ACTIVITY (TIMED INTERVENTIONS)")
361        print(f"{'='*80}")
362        print(f"πŸ“Š Configuration:")
363        print(f"   β€’ Poll interval: {poll_interval}s")
364        print(f"   β€’ Max duration: {max_duration}s")
365        print(f"   β€’ Timed interventions: {len(timed_interventions)}")
366        for i, (delay, msg) in enumerate(timed_interventions, 1):
367            print(f"     {i}. After {delay}s: '{msg[:50]}{'...' if len(msg) > 50 else ''}'")
368        print(f"   β€’ Show JSON: {self.show_json}")
369        print(f"   β€’ Show thinking: {self.show_thinking}")
370        print(f"   β€’ Verbose: {self.verbose}")
371        print(f"{'─'*80}")
372
373        start_time = time.time()
374
375        while True:
376            # Check if we've exceeded max duration
377            elapsed = time.time() - start_time
378            if elapsed > max_duration:
379                print(f"\n⏱️  Max duration ({max_duration}s) reached. Stopping monitor.")
380                break
381
382            # Send timed intervention messages (only once per intervention)
383            for intervention_id, (delay_seconds, message) in enumerate(timed_interventions):
384                if (elapsed >= delay_seconds and
385                    intervention_id not in self.interventions_sent):
386                    self.send_message(task_id, execution_id, message, intervention_id)
387                    self.interventions_sent.add(intervention_id)
388
389            # Fetch new records
390            new_records = self.fetch_new_records(execution_id)
391
392            # Display new records
393            for record in new_records:
394                self.display_record(record)
395
396            # Check if task is completed (but don't exit - continue for timed interventions)
397            task_completed = self.check_task_status(task_id)
398            if task_completed and not hasattr(self, '_task_completion_logged'):
399                print(f"\n{'─'*80}")
400                print("βœ… TASK COMPLETED")
401                print(f"{'─'*80}")
402                self._task_completion_logged = True
403
404            # Wait before next poll
405            time.sleep(poll_interval)
406
407        # Final summary
408        duration = time.time() - start_time
409        interventions_completed = len(self.interventions_sent)
410        print(f"\n{'='*80}")
411        print("MONITORING SUMMARY")
412        print(f"{'='*80}")
413        print(f"πŸ“Š Statistics:")
414        print(f"   β€’ Total records: {self.record_count}")
415        print(f"   β€’ Message records: {self.message_count}")
416        print(f"   β€’ Interventions sent: {interventions_completed}/{len(timed_interventions)}")
417        print(f"   β€’ Duration: {duration:.1f}s")
418        print(f"{'='*80}\n")
419
420
421def main():
422    """Run the timed interactive monitoring demo."""
423
424    # Get configuration
425    config = get_config()
426    config.print_configuration()
427
428    # Configuration - can be overridden with environment variables
429    agent_space_id = config.agent_space_id or os.getenv('AGENT_SPACE_ID')
430    user_id = config.user_id
431    region = config.region
432    create_new_investigation = os.getenv('CREATE_NEW_INVESTIGATION', 'false').lower() == 'true'
433
434    # Timed interventions: (delay_seconds, message)
435    timed_interventions = [
436        (2.0, "Please investigate the high memory usage issue on our production servers"),
437        (6.0, "The issue started around 3 PM today and affects multiple EC2 instances in us-east-1"),
438        (10.0, "Customers are reporting slow response times, please prioritize this investigation")
439    ]
440
441    # Monitoring configuration
442    poll_interval = 1  # Poll every 1 second
443    max_duration = 15  # Stop after 10 seconds for demo
444
445    # Display options
446    show_json = os.getenv('SHOW_JSON', 'false').lower() == 'true'
447    show_thinking = os.getenv('SHOW_THINKING', 'true').lower() == 'true'
448    verbose = os.getenv('VERBOSE', 'true').lower() == 'true'
449
450    print("\n" + "="*80)
451    print(" "*20 + "TIMED AGENT MONITORING DEMO")
452    print(" "*15 + "(Creating new investigation)")
453    print("="*80)
454    print(f"Agent Space ID: {agent_space_id}")
455    print(f"User ID: {user_id}")
456    print()
457
458    # Create boto3 client
459    client = boto3.client('community-devops-agent', region_name=region)
460
461    # Create monitor
462    monitor = TimedAgentMonitor(
463        client, agent_space_id, user_id,
464        show_json=show_json,
465        show_thinking=show_thinking,
466        verbose=verbose
467    )
468
469    if create_new_investigation:
470        print("Creating new investigation task...")
471        title = "High Memory Usage Investigation"
472        description = "Investigate high memory usage on production servers affecting customer experience"
473        task_id = monitor.create_task(title, description)
474
475        if not task_id:
476            print("Failed to create investigation task")
477            return
478
479        # Wait for task to start
480        if not monitor.wait_for_task_start(task_id):
481            print("Failed to wait for task to start")
482            return
483    else:
484        # Find existing task to use for monitoring
485        print("Finding existing task for monitoring...")
486        try:
487            response = client.list_tasks(
488                agentSpaceId=agent_space_id,
489                limit=10
490            )
491
492            tasks = response.get('tasks', [])
493            if not tasks:
494                print("No existing tasks found. Creating a new investigation task...")
495                # Fallback: create new task if no existing ones
496                title = "Timed Interactive Demo"
497                description = "Demonstration with timed intervention messages"
498                task_id = monitor.create_task(title, description)
499
500                if not task_id:
501                    print("Failed to create investigation task")
502                    return
503
504                # Wait for task to start
505                if not monitor.wait_for_task_start(task_id):
506                    print("Failed to wait for task to start")
507                    return
508            else:
509                # Use the first existing task
510                first_task = tasks[0]
511                task_id = first_task['taskId']
512                task_type = first_task.get('taskType', 'unknown')
513                status = first_task.get('status', 'unknown')
514
515                print(f"Using existing task: {task_id}")
516                print(f"  Type: {task_type}")
517                print(f"  Status: {status}")
518
519        except ClientError as e:
520            print(f"Error listing tasks: {e}")
521            print("Creating a new investigation task as fallback...")
522            # Fallback: create new task if listing fails
523            title = "Timed Interactive Demo"
524            description = "Demonstration with timed intervention messages"
525            task_id = monitor.create_task(title, description)
526
527            if not task_id:
528                print("Failed to create investigation task")
529                return
530
531            # Wait for task to start
532            if not monitor.wait_for_task_start(task_id):
533                print("Failed to wait for task to start")
534                return
535
536    # Get execution ID by sending initial chat message
537    execution_id = monitor.get_execution_id_from_chat(task_id)
538
539    if not execution_id:
540        print("Failed to get execution ID from chat")
541        return
542
543    # Monitor task with timed interventions
544    monitor.monitor_task(
545        task_id=task_id,
546        execution_id=execution_id,
547        timed_interventions=timed_interventions,
548        poll_interval=poll_interval,
549        max_duration=max_duration
550    )
551
552    print("✨ Timed monitoring demo completed!")
553    print(f"   Task ID: {task_id}")
554    print(f"   Execution ID: {execution_id}")
555    print(f"   Total records displayed: {monitor.record_count}")
556    print(f"   Message records: {monitor.message_count}")
557    print(f"   Interventions sent: {len(monitor.interventions_sent)}/{len(timed_interventions)}\n")
558
559
560if __name__ == '__main__':
561    main()

Enhanced interactive monitoring:

  1#!/usr/bin/env python3
  2"""
  3Enhanced Interactive Agent Monitoring Example
  4
  5This example demonstrates comprehensive monitoring with:
  61. Full JSON parsing of all record types
  72. Rich formatted output with all details
  83. Configurable verbosity levels
  94. Support for all record types (message, activity, observation, etc.)
 105. Interactive intervention with message injection
 11
 12The script shows EVERYTHING - all records, parsed JSON, metadata, etc.
 13"""
 14
 15import devopsagent_api
 16import boto3
 17from botocore.exceptions import ClientError, WaiterError
 18import time
 19from datetime import datetime
 20import os
 21import json
 22from typing import Dict, List, Any, Optional
 23from config import get_config
 24
 25
 26class EnhancedAgentMonitor:
 27    """Monitor an active agent investigation with comprehensive output."""
 28
 29    def __init__(self, client, agent_space_id, user_id='interactive-user',
 30                 show_json=False, show_thinking=True, verbose=True):
 31        self.client = client
 32        self.agent_space_id = agent_space_id
 33        self.user_id = user_id
 34        self.seen_record_ids = set()
 35        self.record_count = 0
 36        self.message_count = 0
 37        self.intervention_sent = False
 38
 39        # Display options
 40        self.show_json = show_json
 41        self.show_thinking = show_thinking
 42        self.verbose = verbose
 43
 44        # Record type emojis
 45        self.type_emojis = {
 46            'message': 'πŸ’¬',
 47            'activity': 'βš™οΈ',
 48            'observation': 'πŸ‘οΈ',
 49            'symptom': '⚠️',
 50            'finding': 'πŸ”',
 51            'plan': 'πŸ“‹',
 52            'investigation_summary': 'πŸ“Š'
 53        }
 54
 55    def create_task(self, title, description):
 56        """Create a new investigation task."""
 57        print("\n" + "="*80)
 58        print("CREATING INVESTIGATION TASK")
 59        print("="*80)
 60
 61        try:
 62            response = self.client.create_task(
 63                agentSpaceId=self.agent_space_id,
 64                title=title,
 65                description=description,
 66                taskType='INVESTIGATION',
 67                priority='HIGH'
 68            )
 69
 70            task = response['task']
 71            task_id = task['taskId']
 72
 73            print(f"βœ“ Task created successfully!")
 74            print(f"  Task ID: {task_id}")
 75            print(f"  Title: {title}")
 76            print(f"  Status: {task['status']}")
 77            print(f"  Priority: {task['priority']}")
 78
 79            return task_id
 80
 81        except ClientError as e:
 82            print(f"βœ— Error creating task: {e}")
 83            return None
 84
 85    def wait_for_task_start(self, task_id, timeout=60):
 86        """Wait for task to start using waiter."""
 87        print(f"\n{'─'*80}")
 88        print("WAITING FOR AGENT TO START")
 89        print(f"{'─'*80}")
 90
 91        try:
 92            waiter = self.client.get_waiter('task_started')
 93
 94            print(f"⏳ Waiting for task to start (timeout: {timeout}s)...")
 95
 96            waiter.wait(
 97                agentSpaceId=self.agent_space_id,
 98                taskId=task_id,
 99                WaiterConfig={
100                    'Delay': 2,
101                    'MaxAttempts': timeout // 2
102                }
103            )
104
105            print(f"βœ“ Agent has started working on the task!")
106            return True
107
108        except WaiterError as e:
109            print(f"βœ— Timeout waiting for task to start: {e}")
110            return False
111        except ClientError as e:
112            print(f"βœ— Error: {e}")
113            return False
114
115    def get_execution_id(self, task_id):
116        """Get the execution ID for the task."""
117        try:
118            response = self.client.list_executions(
119                agentSpaceId=self.agent_space_id,
120                taskId=task_id,
121                limit=1
122            )
123
124            executions = response.get('executions', [])
125            if executions:
126                execution_id = executions[0]['executionId']
127                print(f"  Execution ID: {execution_id}")
128                return execution_id
129
130            return None
131
132        except ClientError as e:
133            print(f"βœ— Error getting execution: {e}")
134            return None
135
136    def send_message(self, task_id, execution_id, message):
137        """Send a chat message to the agent."""
138        try:
139            response = self.client.send_chat_message(
140                agentSpaceId=self.agent_space_id,
141                taskId=task_id,
142                executionId=execution_id,
143                content=message,
144                userId=self.user_id
145            )
146
147            print(f"\n{'β”Œ'+'─'*78+'┐'}")
148            print(f"β”‚ πŸ’¬ USER INTERVENTION{' '*58}β”‚")
149            print(f"β”‚{' '*78}β”‚")
150            print(f"β”‚ Message: {message[:64]:<64}β”‚")
151            if len(message) > 64:
152                print(f"β”‚          {message[64:]:<64}β”‚")
153            print(f"β”‚ Message ID: {response['messages'][0]['messageId']:<59}β”‚")
154            print(f"β””{'─'*78}β”˜\n")
155
156            self.intervention_sent = True
157            return True
158
159        except ClientError as e:
160            print(f"\nβœ— Error sending message: {e}\n")
161            return False
162
163    def fetch_new_records(self, execution_id):
164        """Fetch new journal records (all types)."""
165        try:
166            response = self.client.get_journal_records(
167                agentSpaceId=self.agent_space_id,
168                executionId=execution_id,
169                order='ASC',
170                limit=100
171            )
172
173            records = response.get('records', [])
174
175            # Filter out records we've already seen
176            new_records = [
177                r for r in records
178                if r['recordId'] not in self.seen_record_ids
179            ]
180
181            # Mark as seen
182            for record in new_records:
183                self.seen_record_ids.add(record['recordId'])
184
185            return new_records
186
187        except ClientError as e:
188            # Silently handle errors during polling
189            return []
190
191    def parse_message_content(self, content_str: str) -> Optional[Dict]:
192        """Parse JSON message content."""
193        try:
194            return json.loads(content_str)
195        except (json.JSONDecodeError, TypeError):
196            return None
197
198    def extract_text_from_content(self, content_data: Dict) -> List[str]:
199        """Extract readable text from parsed message content."""
200        texts = []
201
202        if not isinstance(content_data, dict):
203            return texts
204
205        # Extract from content array
206        content_array = content_data.get('content', [])
207        if isinstance(content_array, list):
208            for item in content_array:
209                if isinstance(item, dict):
210                    # Text content
211                    if 'text' in item:
212                        texts.append(('text', item['text']))
213
214                    # Thinking content
215                    if 'thinking' in item and self.show_thinking:
216                        texts.append(('thinking', item['thinking']))
217
218                    # Tool use
219                    if 'toolUse' in item:
220                        tool_use = item['toolUse']
221                        tool_name = tool_use.get('name', 'unknown')
222                        texts.append(('tool_use', f"Tool: {tool_name}"))
223
224                    # Tool result
225                    if 'toolResult' in item:
226                        tool_result = item['toolResult']
227                        tool_id = tool_result.get('toolUseId', 'unknown')
228                        texts.append(('tool_result', f"Tool Result: {tool_id}"))
229
230        return texts
231
232    def format_timestamp(self, timestamp: str) -> str:
233        """Format timestamp for display."""
234        if not timestamp or timestamp == 'N/A':
235            return 'N/A'
236
237        try:
238            dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
239            return dt.strftime('%H:%M:%S')
240        except:
241            return timestamp
242
243    def display_record(self, record: Dict):
244        """Display a journal record with full parsing."""
245        self.record_count += 1
246
247        record_id = record.get('recordId', 'unknown')
248        record_type = record.get('recordType', 'unknown')
249        timestamp = self.format_timestamp(record.get('timestamp', 'N/A'))
250        content = record.get('content', '')
251
252        emoji = self.type_emojis.get(record_type, 'πŸ“„')
253
254        # Header
255        print(f"\n{'β”Œ'+'─'*78+'┐'}")
256        print(f"β”‚ {emoji} RECORD #{self.record_count} - {record_type.upper():<60} β”‚")
257        print(f"β”‚ Time: {timestamp:<71} β”‚")
258        print(f"β”‚ ID: {record_id[:72]:<72} β”‚")
259        print(f"β”œ{'─'*78}─")
260
261        # Parse content if it's a message
262        if record_type == 'message':
263            self.message_count += 1
264            parsed = self.parse_message_content(content)
265
266            if parsed:
267                # Show role
268                role = parsed.get('role', 'unknown')
269                print(f"β”‚ Role: {role:<72} β”‚")
270
271                # Extract and display text content
272                texts = self.extract_text_from_content(parsed)
273
274                if texts:
275                    print(f"β”œ{'─'*78}─")
276                    for content_type, text in texts:
277                        # Format content type
278                        type_label = content_type.replace('_', ' ').title()
279                        print(f"β”‚ [{type_label}]:{' '*(70-len(type_label))} β”‚")
280
281                        # Word wrap text
282                        words = text.split()
283                        line = ""
284                        for word in words:
285                            if len(line) + len(word) + 1 <= 74:
286                                line += (word + " ")
287                            else:
288                                print(f"β”‚ {line:<76} β”‚")
289                                line = word + " "
290                        if line:
291                            print(f"β”‚ {line:<76} β”‚")
292
293                # Show raw JSON if requested
294                if self.show_json:
295                    print(f"β”œ{'─'*78}─")
296                    print(f"β”‚ [Raw JSON]:{' '*66} β”‚")
297                    json_str = json.dumps(parsed, indent=2)
298                    for line in json_str.split('\n')[:10]:  # Limit to 10 lines
299                        print(f"β”‚ {line[:76]:<76} β”‚")
300                    if len(json_str.split('\n')) > 10:
301                        print(f"β”‚ ... (truncated){' '*62} β”‚")
302            else:
303                # Show raw content if not parseable
304                print(f"β”‚ Content: {content[:68]:<68} β”‚")
305                if len(content) > 68:
306                    print(f"β”‚ ... (truncated){' '*62} β”‚")
307        else:
308            # Non-message records
309            print(f"β”‚ Content: {content[:68]:<68} β”‚")
310            if len(content) > 68:
311                remaining = content[68:]
312                while remaining:
313                    chunk = remaining[:76]
314                    print(f"β”‚ {chunk:<76} β”‚")
315                    remaining = remaining[76:]
316
317        print(f"β””{'─'*78}β”˜")
318
319    def check_task_status(self, task_id):
320        """Check if task is completed."""
321        try:
322            response = self.client.get_task(
323                agentSpaceId=self.agent_space_id,
324                taskId=task_id
325            )
326
327            task = response['task']
328            status = task['status']
329
330            return status in ['COMPLETED', 'FAILED', 'CANCELED', 'CANCELLED']
331
332        except ClientError:
333            return False
334
335    def monitor_task(self, task_id, execution_id, intervention_message,
336                     intervention_after=5, poll_interval=2, max_duration=300):
337        """Monitor task execution with comprehensive output."""
338        print(f"\n{'='*80}")
339        print("MONITORING AGENT ACTIVITY (ENHANCED MODE)")
340        print(f"{'='*80}")
341        print(f"πŸ“Š Configuration:")
342        print(f"   β€’ Poll interval: {poll_interval}s")
343        print(f"   β€’ Intervention after: {intervention_after} messages")
344        print(f"   β€’ Max duration: {max_duration}s")
345        print(f"   β€’ Show JSON: {self.show_json}")
346        print(f"   β€’ Show thinking: {self.show_thinking}")
347        print(f"   β€’ Verbose: {self.verbose}")
348        print(f"{'─'*80}")
349
350        start_time = time.time()
351
352        while True:
353            # Check if we've exceeded max duration
354            elapsed = time.time() - start_time
355            if elapsed > max_duration:
356                print(f"\n⏱️  Max duration ({max_duration}s) reached. Stopping monitor.")
357                break
358
359            # Fetch new records
360            new_records = self.fetch_new_records(execution_id)
361
362            # Display new records
363            for record in new_records:
364                self.display_record(record)
365
366            # Send intervention message after N messages
367            if (self.message_count >= intervention_after and
368                not self.intervention_sent):
369                self.send_message(task_id, execution_id, intervention_message)
370
371            # Check if task is completed
372            if self.check_task_status(task_id):
373                print(f"\n{'─'*80}")
374                print("βœ… TASK COMPLETED")
375                print(f"{'─'*80}")
376                break
377
378            # Wait before next poll
379            time.sleep(poll_interval)
380
381        # Final summary
382        duration = time.time() - start_time
383        print(f"\n{'='*80}")
384        print("MONITORING SUMMARY")
385        print(f"{'='*80}")
386        print(f"πŸ“Š Statistics:")
387        print(f"   β€’ Total records: {self.record_count}")
388        print(f"   β€’ Message records: {self.message_count}")
389        print(f"   β€’ Intervention sent: {'Yes' if self.intervention_sent else 'No'}")
390        print(f"   β€’ Duration: {duration:.1f}s")
391        print(f"{'='*80}\n")
392
393
394def main():
395    """Run the enhanced interactive monitoring example."""
396
397    # Get configuration
398    config = get_config()
399    config.print_configuration()
400
401    # Configuration
402    agent_space_id = config.agent_space_id or os.getenv('AGENT_SPACE_ID')
403    user_id = config.user_id
404
405    # Task configuration
406    task_title = 'Investigate high memory usage on production servers'
407    task_description = '''
408    Production servers are showing consistently high memory usage (>85%)
409    over the past 2 hours. Please investigate:
410    1. Which processes are consuming the most memory
411    2. Any memory leaks or unusual patterns
412    3. Recommendations for remediation
413    '''
414
415    # Monitoring configuration
416    intervention_message = "Can you also check if there are any disk I/O bottlenecks?"
417    intervention_after = 5  # Send message after 5 agent messages
418    poll_interval = 2  # Poll every 2 seconds
419    max_duration = 300  # Stop after 5 minutes
420
421    # Display options
422    show_json = os.getenv('SHOW_JSON', 'false').lower() == 'true'
423    show_thinking = os.getenv('SHOW_THINKING', 'true').lower() == 'true'
424    verbose = os.getenv('VERBOSE', 'true').lower() == 'true'
425
426    print("\n" + "="*80)
427    print(" "*20 + "ENHANCED AGENT MONITORING")
428    print("="*80)
429
430    # Create boto3 client
431    client = config.get_client()
432
433    # Create monitor
434    monitor = EnhancedAgentMonitor(
435        client, agent_space_id, user_id,
436        show_json=show_json,
437        show_thinking=show_thinking,
438        verbose=verbose
439    )
440
441    # Step 1: Create task
442    task_id = monitor.create_task(task_title, task_description)
443    if not task_id:
444        print("\nβœ— Failed to create task. Exiting.")
445        return
446
447    # Step 2: Wait for task to start
448    if not monitor.wait_for_task_start(task_id, timeout=60):
449        print("\n⚠️  Task did not start in time. You may need to check the task manually.")
450        return
451
452    # Step 3: Get execution ID
453    execution_id = monitor.get_execution_id(task_id)
454    if not execution_id:
455        print("\nβœ— Could not get execution ID. Exiting.")
456        return
457
458    # Step 4: Monitor task with intervention
459    monitor.monitor_task(
460        task_id=task_id,
461        execution_id=execution_id,
462        intervention_message=intervention_message,
463        intervention_after=intervention_after,
464        poll_interval=poll_interval,
465        max_duration=max_duration
466    )
467
468    print("✨ Enhanced monitoring completed!")
469    print(f"   Task ID: {task_id}")
470    print(f"   Total records displayed: {monitor.record_count}")
471    print(f"   Message records: {monitor.message_count}\n")
472
473
474if __name__ == '__main__':
475    main()

Knowledge Base Management

  1#!/usr/bin/env python3
  2"""
  3Community DevOps Agent - Knowledgebase Management Example
  4
  5This example demonstrates comprehensive knowledgebase management capabilities
  6for the Community DevOps Agent API, including:
  7
  8- Creating knowledge items (runbooks, documentation, procedures, guides)
  9- Listing and searching knowledge items with filtering
 10- Retrieving specific knowledge items
 11- Updating knowledge content and metadata
 12- Managing lifecycle status (DRAFT β†’ REVIEW β†’ PUBLISHED β†’ ARCHIVED)
 13- Deleting knowledge items
 14- Interactive management interface
 15
 16Knowledge items support multiple content types and can store operational
 17procedures, troubleshooting guides, best practices, and reference materials.
 18
 19Usage:
 20    python examples/knowledgebase_management.py
 21
 22Environment Variables:
 23    AGENT_SPACE_ID - Your DevOps Agent space UUID (required)
 24    AWS_REGION - AWS region (default: us-east-1)
 25"""
 26
 27import os
 28import sys
 29import json
 30import uuid
 31from datetime import datetime
 32from typing import Dict, List, Optional, Any
 33from dataclasses import dataclass
 34
 35# Add the project root to Python path
 36sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 37
 38import devopsagent_api
 39import boto3
 40from botocore.exceptions import ClientError
 41
 42
 43@dataclass
 44class KnowledgeItem:
 45    """Knowledge item data structure"""
 46    knowledge_item_id: str
 47    agent_space_id: str
 48    title: str
 49    content: Dict[str, str]
 50    knowledge_item_type: str
 51    life_cycle_status: str
 52    properties: Optional[Dict[str, str]] = None
 53    source_references: Optional[List[str]] = None
 54    created_at: Optional[str] = None
 55    updated_at: Optional[str] = None
 56    version: Optional[int] = None
 57
 58
 59class KnowledgebaseManager:
 60    """Knowledgebase management client for DevOps Agent"""
 61
 62    def __init__(self, agent_space_id: str, region_name: str = None):
 63        """Initialize the knowledgebase manager"""
 64        if region_name is None:
 65            from config import get_config
 66            config = get_config()
 67            region_name = config.region
 68        self.agent_space_id = agent_space_id
 69        self.region_name = region_name
 70        self.client = boto3.client(
 71            'community-devops-agent',
 72            region_name=region_name
 73        )
 74        print(f"πŸ€– Knowledgebase Manager initialized for agent space: {agent_space_id}")
 75
 76    def create_knowledge_item(
 77        self,
 78        title: str,
 79        content: str,
 80        content_type: str = "text/markdown",
 81        knowledge_item_type: str = "RUNBOOK",
 82        life_cycle_status: str = "ACTIVE",
 83        properties: Optional[Dict[str, str]] = None,
 84        source_references: Optional[List[str]] = None
 85    ) -> KnowledgeItem:
 86        """Create a new knowledge item"""
 87        try:
 88            # Build request parameters dynamically to avoid sending None values
 89            params = {
 90                'agentSpaceId': self.agent_space_id,
 91                'title': title,
 92                'content': {
 93                    'type': content_type,
 94                    'value': content
 95                },
 96                'knowledgeItemType': knowledge_item_type
 97            }
 98
 99            # Only add optional parameters if they have values
100            if life_cycle_status:
101                params['lifeCycleStatus'] = life_cycle_status
102            if properties is not None:
103                params['properties'] = properties
104            if source_references:
105                params['sourceReferences'] = source_references
106
107            response = self.client.create_knowledge_item(**params)
108
109            # The API returns 201 Created but may not return the full item data
110            # For now, we'll create a minimal KnowledgeItem with the data we sent
111            return KnowledgeItem(
112                knowledge_item_id="created",  # We don't get the ID back
113                agent_space_id=self.agent_space_id,
114                title=title,
115                content={
116                    'type': content_type,
117                    'value': content
118                },
119                knowledge_item_type=knowledge_item_type,
120                life_cycle_status=life_cycle_status or "ACTIVE",
121                properties=properties,
122                source_references=source_references,
123                created_at=None,  # We don't get timestamps back
124                updated_at=None,
125                version=None
126            )
127
128        except ClientError as e:
129            print(f"❌ Failed to create knowledge item: {e}")
130            raise
131
132    def get_knowledge_item(self, knowledge_item_id: str) -> KnowledgeItem:
133        """Retrieve a specific knowledge item"""
134        try:
135            response = self.client.get_knowledge_item(
136                agentSpaceId=self.agent_space_id,
137                knowledgeItemId=knowledge_item_id
138            )
139
140            # The API returns success but may not include the full item data
141            # Check if response contains expected data, if not, operation still succeeded
142            try:
143                item_data = response['knowledgeItem']
144                return KnowledgeItem(**item_data)
145            except KeyError:
146                # API succeeded but didn't return item data - this is expected
147                # Return a minimal KnowledgeItem to indicate success
148                return KnowledgeItem(
149                    knowledge_item_id=knowledge_item_id,
150                    agent_space_id=self.agent_space_id,
151                    title="Retrieved Item",  # Placeholder
152                    content={'type': 'text/plain', 'value': 'Item retrieved successfully'},
153                    knowledge_item_type="RUNBOOK",
154                    life_cycle_status="ACTIVE"
155                )
156
157        except ClientError as e:
158            print(f"❌ Failed to get knowledge item: {e}")
159            raise
160
161    def update_knowledge_item(
162        self,
163        knowledge_item_id: str,
164        title: Optional[str] = None,
165        content: Optional[str] = None,
166        content_type: Optional[str] = None,
167        knowledge_item_type: Optional[str] = None,
168        life_cycle_status: Optional[str] = None,
169        properties: Optional[Dict[str, str]] = None,
170        source_references: Optional[List[str]] = None
171    ) -> KnowledgeItem:
172        """Update an existing knowledge item"""
173        try:
174            update_params = {
175                'agentSpaceId': self.agent_space_id,
176                'knowledgeItemId': knowledge_item_id
177            }
178
179            if title is not None:
180                update_params['title'] = title
181
182            if content is not None or content_type is not None:
183                current_item = self.get_knowledge_item(knowledge_item_id)
184                update_params['content'] = {
185                    'type': content_type or current_item.content['type'],
186                    'value': content if content is not None else current_item.content['value']
187                }
188
189            if knowledge_item_type is not None:
190                update_params['knowledgeItemType'] = knowledge_item_type
191
192            if life_cycle_status is not None:
193                update_params['lifeCycleStatus'] = life_cycle_status
194
195            if properties is not None:
196                update_params['properties'] = properties
197
198            if source_references is not None:
199                update_params['sourceReferences'] = source_references
200
201            response = self.client.update_knowledge_item(**update_params)
202
203            # The API returns success but may not include the full item data
204            # Check if response contains expected data, if not, operation still succeeded
205            try:
206                item_data = response['knowledgeItem']
207                return KnowledgeItem(**item_data)
208            except KeyError:
209                # API succeeded but didn't return item data - this is expected
210                # Return a minimal KnowledgeItem to indicate success
211                return KnowledgeItem(
212                    knowledge_item_id=knowledge_item_id,
213                    agent_space_id=self.agent_space_id,
214                    title="Updated Item",  # Placeholder
215                    content={'type': 'text/plain', 'value': 'Item updated successfully'},
216                    knowledge_item_type="RUNBOOK",
217                    life_cycle_status="ACTIVE",
218                    properties=properties
219                )
220
221        except ClientError as e:
222            print(f"❌ Failed to update knowledge item: {e}")
223            raise
224
225    def delete_knowledge_item(self, knowledge_item_id: str) -> bool:
226        """Delete a knowledge item"""
227        try:
228            response = self.client.delete_knowledge_item(
229                agentSpaceId=self.agent_space_id,
230                knowledgeItemId=knowledge_item_id
231            )
232
233            # The API returns success but may not include a 'success' field
234            # Check if response contains expected data, if not, operation still succeeded
235            try:
236                return response.get('success', True)  # Default to True if field exists but is missing
237            except KeyError:
238                # API succeeded but didn't return success field - this is expected
239                # Since we got here without a ClientError, the operation succeeded
240                return True
241
242        except ClientError as e:
243            print(f"❌ Failed to delete knowledge item: {e}")
244            raise
245
246    def list_knowledge_items(
247        self,
248        limit: int = 50,
249        next_token: Optional[str] = None,
250        knowledge_item_type: Optional[str] = None,
251        life_cycle_status: Optional[str] = None,
252        last_modified_after: Optional[str] = None,
253        last_modified_before: Optional[str] = None
254    ) -> Dict[str, Any]:
255        """List knowledge items with optional filtering"""
256        try:
257            params = {
258                'agentSpaceId': self.agent_space_id,
259                'limit': limit
260            }
261
262            if next_token:
263                params['nextToken'] = next_token
264            if knowledge_item_type:
265                params['knowledgeItemType'] = knowledge_item_type
266            if life_cycle_status:
267                params['lifeCycleStatus'] = life_cycle_status
268            if last_modified_after:
269                params['lastModifiedAfter'] = last_modified_after
270            if last_modified_before:
271                params['lastModifiedBefore'] = last_modified_before
272
273            response = self.client.list_knowledge_items(**params)
274
275            # Convert response items to KnowledgeItem objects
276            knowledge_items = []
277            for item_data in response['knowledgeItems']:
278                # Handle API response field name differences
279                processed_data = {}
280                for key, value in item_data.items():
281                    # Convert camelCase to snake_case for dataclass
282                    if key == 'knowledgeItemId':
283                        processed_data['knowledge_item_id'] = value
284                    elif key == 'agentSpaceId':
285                        processed_data['agent_space_id'] = value
286                    elif key == 'knowledgeItemType':
287                        processed_data['knowledge_item_type'] = value
288                    elif key == 'lifeCycleStatus':
289                        processed_data['life_cycle_status'] = value
290                    elif key == 'sourceReferences':
291                        processed_data['source_references'] = value
292                    elif key == 'createdAt':
293                        processed_data['created_at'] = value
294                    elif key == 'updatedAt':
295                        processed_data['updated_at'] = value
296                    else:
297                        # Keep other fields as-is (they might already be snake_case)
298                        processed_data[key] = value
299
300                # Extract title and description from properties.runbook if available
301                if 'properties' in processed_data and isinstance(processed_data['properties'], dict):
302                    runbook_data = processed_data['properties'].get('runbook', {})
303                    if isinstance(runbook_data, dict):
304                        if 'title' not in processed_data and 'title' in runbook_data:
305                            processed_data['title'] = runbook_data['title']
306                        if 'description' in runbook_data:
307                            # Store description in content for now
308                            processed_data['content'] = {
309                                'type': 'text/plain',
310                                'value': runbook_data['description']
311                            }
312
313                # Set defaults for missing required fields
314                processed_data.setdefault('agent_space_id', self.agent_space_id)  # Use the one we queried with
315                processed_data.setdefault('title', f"Knowledge Item {processed_data['knowledge_item_id'][:8]}...")
316                processed_data.setdefault('content', {'type': 'text/plain', 'value': 'Content not available in list view'})
317                processed_data.setdefault('updated_at', None)
318
319                # Ensure all required fields are present
320                required_fields = ['knowledge_item_id', 'agent_space_id', 'title', 'content', 'knowledge_item_type', 'life_cycle_status']
321                missing_fields = [field for field in required_fields if field not in processed_data]
322                if missing_fields:
323                    print(f"⚠️  Warning: Missing required fields in API response: {missing_fields}")
324                    # Skip this item if required fields are missing
325                    continue
326
327                knowledge_items.append(KnowledgeItem(**processed_data))
328
329            return {
330                'knowledge_items': knowledge_items,
331                'next_token': response.get('nextToken')
332            }
333
334        except ClientError as e:
335            print(f"❌ Failed to list knowledge items: {e}")
336            raise
337
338    def create_runbook(
339        self,
340        title: str,
341        description: str,
342        steps: List[str],
343        prerequisites: Optional[List[str]] = None,
344        tags: Optional[List[str]] = None
345    ) -> KnowledgeItem:
346        """Create a runbook knowledge item"""
347        content = f"""# {title}
348
349## Description
350{description}
351
352## Prerequisites
353{chr(10).join(f"- {prereq}" for prereq in (prerequisites or []))}
354
355## Steps
356{chr(10).join(f"{i+1}. {step}" for i, step in enumerate(steps))}
357
358## Tags
359{", ".join(tags or [])}
360"""
361
362        properties = {
363            'description': description,
364            'step_count': str(len(steps)),
365            'has_prerequisites': str(bool(prerequisites))
366        }
367
368        if tags:
369            properties['tags'] = ','.join(tags)
370
371        return self.create_knowledge_item(
372            title=title,
373            content=content,
374            content_type="text/markdown",
375            knowledge_item_type="RUNBOOK",
376            life_cycle_status="ACTIVE",
377            properties={
378                'runbook': {
379                    'title': title,
380                    'description': description
381                }
382            }
383        )
384
385    def create_troubleshooting_guide(
386        self,
387        title: str,
388        problem_description: str,
389        symptoms: List[str],
390        solutions: List[Dict[str, Any]],
391        prevention_tips: Optional[List[str]] = None
392    ) -> KnowledgeItem:
393        """Create a troubleshooting guide"""
394        content = f"""# {title}
395
396## Problem Description
397{problem_description}
398
399## Symptoms
400{chr(10).join(f"- {symptom}" for symptom in symptoms)}
401
402## Solutions
403"""
404
405        for i, solution in enumerate(solutions, 1):
406            content += f"""
407### Solution {i}: {solution.get('title', 'Untitled')}
408**Priority:** {solution.get('priority', 'Medium')}
409
410{solution.get('description', '')}
411
412**Steps:**
413{chr(10).join(f"1. {step}" for step in solution.get('steps', []))}
414"""
415
416        if prevention_tips:
417            content += f"""
418## Prevention Tips
419{chr(10).join(f"- {tip}" for tip in prevention_tips)}
420"""
421
422        properties = {
423            'problem_type': 'troubleshooting',
424            'solution_count': str(len(solutions)),
425            'symptom_count': str(len(symptoms))
426        }
427
428        return self.create_knowledge_item(
429            title=title,
430            content=content,
431            content_type="text/markdown",
432            knowledge_item_type="RUNBOOK",
433            life_cycle_status="ACTIVE",
434            properties={
435                'runbook': {
436                    'title': title,
437                    'description': problem_description
438                }
439            }
440        )
441
442
443def display_knowledge_item(item: KnowledgeItem):
444    """Display a knowledge item in a formatted way"""
445    print(f"\nπŸ“„ Knowledge Item: {item.title}")
446    print(f"   ID: {item.knowledge_item_id}")
447    print(f"   Type: {item.knowledge_item_type}")
448    print(f"   Status: {item.life_cycle_status}")
449    print(f"   Created: {item.created_at}")
450    print(f"   Updated: {item.updated_at}")
451    print(f"   Version: {item.version}")
452
453    if item.properties:
454        print(f"   Properties: {json.dumps(item.properties, indent=2)}")
455
456    print(f"\nπŸ“– Content ({item.content['type']}):")
457    print("-" * 50)
458    print(item.content['value'])
459    print("-" * 50)
460
461
462def interactive_menu(manager: KnowledgebaseManager):
463    """Interactive menu for knowledgebase management"""
464    while True:
465        print("\n" + "="*60)
466        print("🧠 DevOps Agent Knowledgebase Management")
467        print("="*60)
468        print("1. πŸ“ Create new knowledge item")
469        print("2. πŸ“‹ List knowledge items")
470        print("3. πŸ” Get specific knowledge item")
471        print("4. ✏️  Update knowledge item")
472        print("5. πŸ—‘οΈ  Delete knowledge item")
473        print("6. πŸ“– Create runbook")
474        print("7. πŸ”§ Create troubleshooting guide")
475        print("8. πŸ”„ Update lifecycle status")
476        print("9. πŸ“Š Show statistics")
477        print("0. πŸšͺ Exit")
478        print("="*60)
479
480        choice = input("Choose an option (0-9): ").strip()
481
482        try:
483            if choice == "0":
484                print("πŸ‘‹ Goodbye!")
485                break
486
487            elif choice == "1":
488                # Create knowledge item
489                title = input("Title: ").strip()
490                content = input("Content: ").strip()
491                content_type = input("Content type (text/markdown): ").strip() or "text/markdown"
492                item_type = input("Type (POLICY/RUNBOOK): ").strip() or "RUNBOOK"
493
494                item = manager.create_knowledge_item(
495                    title=title,
496                    content=content,
497                    content_type=content_type,
498                    knowledge_item_type=item_type
499                )
500                print(f"βœ… Created knowledge item: {item.knowledge_item_id}")
501                display_knowledge_item(item)
502
503            elif choice == "2":
504                # List knowledge items
505                item_type = input("Filter by type (optional): ").strip() or None
506                status = input("Filter by status (optional): ").strip() or None
507
508                result = manager.list_knowledge_items(
509                    knowledge_item_type=item_type,
510                    life_cycle_status=status
511                )
512
513                print(f"\nπŸ“‹ Found {len(result['knowledge_items'])} knowledge items:")
514                for item in result['knowledge_items']:
515                    print(f"  β€’ {item.knowledge_item_id}: {item.title} ({item.knowledge_item_type}, {item.life_cycle_status})")
516
517            elif choice == "3":
518                # Get specific item
519                item_id = input("Knowledge item ID: ").strip()
520                item = manager.get_knowledge_item(item_id)
521                display_knowledge_item(item)
522
523            elif choice == "4":
524                # Update item
525                item_id = input("Knowledge item ID: ").strip()
526                title = input("New title (leave empty to keep current): ").strip() or None
527                content = input("New content (leave empty to keep current): ").strip() or None
528
529                item = manager.update_knowledge_item(
530                    knowledge_item_id=item_id,
531                    title=title,
532                    content=content
533                )
534                print("βœ… Updated knowledge item")
535                display_knowledge_item(item)
536
537            elif choice == "5":
538                # Delete item
539                item_id = input("Knowledge item ID: ").strip()
540                confirm = input(f"Are you sure you want to delete {item_id}? (yes/no): ").strip().lower()
541                if confirm == "yes":
542                    success = manager.delete_knowledge_item(item_id)
543                    if success:
544                        print("βœ… Knowledge item deleted successfully")
545                    else:
546                        print("❌ Failed to delete knowledge item")
547                else:
548                    print("❌ Deletion cancelled")
549
550            elif choice == "6":
551                # Create runbook
552                title = input("Runbook title: ").strip()
553                description = input("Description: ").strip()
554                steps_input = input("Steps (comma-separated): ").strip()
555                steps = [step.strip() for step in steps_input.split(",") if step.strip()]
556
557                item = manager.create_runbook(
558                    title=title,
559                    description=description,
560                    steps=steps
561                )
562                print(f"βœ… Created runbook: {item.knowledge_item_id}")
563                display_knowledge_item(item)
564
565            elif choice == "7":
566                # Create troubleshooting guide
567                title = input("Guide title: ").strip()
568                problem = input("Problem description: ").strip()
569                symptoms_input = input("Symptoms (comma-separated): ").strip()
570                symptoms = [s.strip() for s in symptoms_input.split(",") if s.strip()]
571
572                solution_title = input("Solution title: ").strip()
573                solution_desc = input("Solution description: ").strip()
574                solution_steps_input = input("Solution steps (comma-separated): ").strip()
575                solution_steps = [s.strip() for s in solution_steps_input.split(",") if s.strip()]
576
577                solutions = [{
578                    'title': solution_title,
579                    'description': solution_desc,
580                    'steps': solution_steps,
581                    'priority': 'High'
582                }]
583
584                item = manager.create_troubleshooting_guide(
585                    title=title,
586                    problem_description=problem,
587                    symptoms=symptoms,
588                    solutions=solutions
589                )
590                print(f"βœ… Created troubleshooting guide: {item.knowledge_item_id}")
591                display_knowledge_item(item)
592
593            elif choice == "8":
594                # Update lifecycle status
595                item_id = input("Knowledge item ID: ").strip()
596                print("Available statuses: ACTIVE, INACTIVE")
597                new_status = input("New status: ").strip()
598
599                item = manager.update_knowledge_item(
600                    knowledge_item_id=item_id,
601                    life_cycle_status=new_status
602                )
603                print(f"βœ… Updated status to: {item.life_cycle_status}")
604
605            elif choice == "9":
606                # Show statistics
607                result = manager.list_knowledge_items(limit=1000)
608                items = result['knowledge_items']
609
610                stats = {
611                    'total': len(items),
612                    'by_type': {},
613                    'by_status': {}
614                }
615
616                for item in items:
617                    stats['by_type'][item.knowledge_item_type] = stats['by_type'].get(item.knowledge_item_type, 0) + 1
618                    stats['by_status'][item.life_cycle_status] = stats['by_status'].get(item.life_cycle_status, 0) + 1
619
620                print(f"\nπŸ“Š Knowledgebase Statistics:")
621                print(f"   Total items: {stats['total']}")
622                print(f"   By type: {stats['by_type']}")
623                print(f"   By status: {stats['by_status']}")
624
625            else:
626                print("❌ Invalid choice. Please try again.")
627
628        except Exception as e:
629            print(f"❌ Error: {e}")
630
631
632def main():
633    """Main function"""
634    print("πŸ€– Community DevOps Agent - Knowledgebase Management Example")
635    print("="*70)
636
637    # Get agent space ID
638    agent_space_id = os.getenv("AGENT_SPACE_ID")
639    if not agent_space_id:
640        print("❌ AGENT_SPACE_ID environment variable is required")
641        print("   Set it with: export AGENT_SPACE_ID=your-agent-space-uuid")
642        sys.exit(1)
643
644    # Get AWS region
645    region = os.getenv("AWS_REGION", "us-east-1")
646
647    try:
648        # Initialize manager
649        manager = KnowledgebaseManager(agent_space_id, region)
650
651        # Check if running interactively
652        if len(sys.argv) > 1 and sys.argv[1] == "--demo":
653            # Run demo mode
654            run_demo(manager)
655        else:
656            # Run interactive mode
657            interactive_menu(manager)
658
659    except KeyboardInterrupt:
660        print("\nπŸ‘‹ Interrupted by user")
661    except Exception as e:
662        print(f"❌ Fatal error: {e}")
663        sys.exit(1)
664
665
666def run_demo(manager: KnowledgebaseManager):
667    """Run a demonstration of knowledgebase features"""
668    print("\n🎬 Running Knowledgebase Management Demo")
669    print("="*50)
670
671    try:
672        # Create a sample runbook to demonstrate creation
673        print("\nπŸ“ Creating a sample runbook...")
674        runbook = manager.create_runbook(
675            title="Demo: AWS Lambda Deployment Runbook",
676            description="Standard procedure for deploying Lambda functions in demo environment",
677            steps=[
678                "Review code changes and run tests",
679                "Update Lambda function code via AWS Console or CLI",
680                "Update environment variables if needed",
681                "Test function in staging environment",
682                "Deploy to production environment",
683                "Monitor CloudWatch logs for errors",
684                "Update documentation with any changes"
685            ],
686            prerequisites=[
687                "AWS CLI configured with appropriate permissions",
688                "Lambda function already exists",
689                "Code has been reviewed and approved"
690            ],
691            tags=["aws", "lambda", "deployment", "demo"]
692        )
693        print(f"βœ… Created runbook: {runbook.title}")
694
695        # List all knowledge items to show the new runbook and existing items
696        print("\nπŸ“‹ Listing all knowledge items...")
697        result = manager.list_knowledge_items()
698        items = result['knowledge_items']
699        print(f"Found {len(items)} items:")
700
701        for i, item in enumerate(items, 1):
702            print(f"  {i}. {item.title} (ID: {item.knowledge_item_id[:12]}...)")
703
704        if len(items) < 3:
705            print("⚠️  Not enough items for full demo operations. Need at least 3 items.")
706            return
707
708        # Use items for testing operations (skip the newly created one for update/get/delete)
709        item1_id = items[0].knowledge_item_id  # First existing item for update
710        item2_id = items[1].knowledge_item_id  # Second existing item for get
711        item3_id = items[2].knowledge_item_id  # Third existing item for delete
712
713        print(f"\nπŸ“ Using existing item IDs for operations:")
714        print(f"   Item 1 (for update): {item1_id[:12]}... ({items[0].title})")
715        print(f"   Item 2 (for get): {item2_id[:12]}... ({items[1].title})")
716        print(f"   Item 3 (for delete): {item3_id[:12]}... ({items[2].title})")
717
718        # Update the first existing item
719        print(f"\nπŸ”„ Updating item 1...")
720        try:
721            updated_item = manager.update_knowledge_item(
722                knowledge_item_id=item1_id,
723                properties={
724                    'runbook': {
725                        'title': f'Updated: {items[0].title}',
726                        'description': f'Updated description for {items[0].title}'
727                    }
728                }
729            )
730            print(f"βœ… Item 1 updated successfully")
731        except Exception as e:
732            print(f"⚠️  Update operation failed: {str(e)}")
733
734        # Get details of the second existing item
735        print(f"\nπŸ” Getting details of item 2...")
736        try:
737            retrieved_item = manager.get_knowledge_item(item2_id)
738            print(f"βœ… Retrieved item 2 details (may be minimal response)")
739        except Exception as e:
740            print(f"⚠️  Get operation failed: {str(e)}")
741
742        # Delete the third existing item
743        print(f"\nπŸ—‘οΈ Deleting item 3...")
744        try:
745            delete_success = manager.delete_knowledge_item(item3_id)
746            if delete_success:
747                print("βœ… Successfully deleted item 3")
748            else:
749                print("⚠️  Delete operation completed but returned no confirmation")
750        except Exception as e:
751            print(f"⚠️  Delete operation failed: {str(e)}")
752
753        # Final summary
754        print(f"\nπŸŽ‰ Demo completed successfully!")
755        print(f"βœ… Created new runbook and tested all CRUD operations")
756        print(f"πŸ“ Created runbook: '{runbook.title}'")
757        print(f"πŸ“Š Total items found: {len(items)}")
758        print(f"πŸ”„ Demonstrated Create, Update, Get, and Delete operations")
759
760    except Exception as e:
761        print(f"❌ Demo failed: {e}")
762        raise
763
764
765if __name__ == "__main__":
766    main()

Test Execution Discovery

  1#!/usr/bin/env python3
  2"""
  3Test script for discovering chat-supporting execution IDs.
  4
  5This script tests the execution discovery logic without creating new investigations.
  6"""
  7
  8import os
  9import boto3
 10from botocore.exceptions import ClientError
 11from config import get_config
 12
 13# Import TimedAgentMonitor from the demo script
 14import sys
 15sys.path.append(os.path.dirname(__file__))
 16from interactive_monitoring_timed_demo import TimedAgentMonitor
 17
 18
 19def test_execution_discovery():
 20    """Test finding chat-supporting executions."""
 21
 22    # Get configuration
 23    config = get_config()
 24    config.print_configuration()
 25
 26    # Configuration
 27    agent_space_id = config.agent_space_id or os.getenv('AGENT_SPACE_ID')
 28    user_id = config.user_id
 29    region = config.region
 30
 31    print("πŸ” Testing Execution Discovery")
 32    print("=" * 50)
 33    print(f"Agent Space ID: {agent_space_id}")
 34    print(f"User ID: {user_id}")
 35    print(f"Region: {region}")
 36    print()
 37
 38    # Create boto3 client
 39    client = boto3.client('community-devops-agent', region_name=region)
 40
 41    # Create monitor
 42    monitor = TimedAgentMonitor(client, agent_space_id, user_id)
 43
 44    # Test 1: Try with a dummy task_id (this should fail gracefully)
 45    print("Test 1: Finding executions for a dummy task ID")
 46    dummy_task_id = "dummy-task-id"
 47    execution_id = monitor.get_execution_id(dummy_task_id)
 48    if execution_id:
 49        print(f"βœ… Found execution: {execution_id}")
 50    else:
 51        print("❌ No execution found for dummy task")
 52
 53    print()
 54
 55    # Test 2: Try to find executions across all tasks (if API allows)
 56    print("Test 2: Finding chat-supporting executions across all tasks")
 57    print("❌ API Limitation: list_executions requires a taskId parameter")
 58    print("   Cannot discover executions without knowing a valid taskId first")
 59    print()
 60
 61    # Test 3: List existing tasks and try the first one
 62    print("Test 3: Listing existing tasks and trying the first one")
 63    try:
 64        response = client.list_tasks(
 65            agentSpaceId=agent_space_id,
 66            limit=10
 67        )
 68
 69        tasks = response.get('tasks', [])
 70        print(f"Found {len(tasks)} existing tasks")
 71
 72        if tasks:
 73            first_task = tasks[0]
 74            task_id = first_task['taskId']
 75            task_type = first_task.get('taskType', 'unknown')
 76            status = first_task.get('status', 'unknown')
 77
 78            print(f"Using first task: {task_id}")
 79            print(f"  Type: {task_type}")
 80            print(f"  Status: {status}")
 81
 82            execution_id = monitor.get_execution_id(task_id)
 83            if execution_id:
 84                print(f"βœ… Found chat-supporting execution: {execution_id}")
 85                return execution_id
 86            else:
 87                print("❌ No chat-supporting execution found for first task")
 88        else:
 89            print("❌ No existing tasks found")
 90
 91    except ClientError as e:
 92        print(f"❌ Error listing tasks: {e}")
 93
 94    return None
 95
 96
 97if __name__ == '__main__':
 98    execution_id = test_execution_discovery()
 99    if execution_id:
100        print(f"\nπŸŽ‰ Success! Chat-supporting execution ID: {execution_id}")
101        print("You can use this execution ID for testing chat messages.")
102    else:
103        print("\nπŸ’₯ Test failed - no chat-supporting executions found.")

Goal Journal Timeline

  1#!/usr/bin/env python3
  2"""
  3Goal Journal Timeline - Fetch and display journal records from all executions for goals.
  4
  5This script retrieves all goals in an agent space, creates evaluation tasks for goals
  6that don't have associated tasks, fetches executions for those tasks, and displays
  7journal records in a chronological timeline organized by goal.
  8"""
  9
 10import os
 11import sys
 12import json
 13import uuid
 14from datetime import datetime
 15from typing import List, Dict, Any, Optional
 16import boto3
 17import devopsagent_api
 18from devopsagent_api.models import (
 19    Goal, GoalType, GoalStatus, Task, TaskType, TaskPriority,
 20    Execution, JournalRecord, parse_journal_content
 21)
 22from devopsagent_api.exceptions import DevOpsAgentError
 23from config import get_config
 24
 25
 26def get_agent_space_id():
 27    """Get agent space ID from environment or prompt user."""
 28    agent_space_id = os.getenv('AGENT_SPACE_ID')
 29    if not agent_space_id:
 30        print("❌ AGENT_SPACE_ID environment variable not set")
 31        print("   Please set it with: export AGENT_SPACE_ID=your-agent-space-uuid")
 32        print("   Or run: AGENT_SPACE_ID=your-uuid python examples/goal_journal_timeline.py")
 33        sys.exit(1)
 34    return agent_space_id
 35
 36
 37def list_goals_for_agent_space(client, agent_space_id: str) -> List[Goal]:
 38    """List all goals for an agent space."""
 39    print(f"\n🎯 Listing goals for agent space: {agent_space_id[:8]}...")
 40    print("-" * 55)
 41
 42    goals = []
 43    next_token = None
 44
 45    try:
 46        while True:
 47            # Build request parameters
 48            request_params = {
 49                'agentSpaceId': agent_space_id,
 50                'limit': 50  # Get up to 50 goals at a time
 51            }
 52            if next_token:
 53                request_params['nextToken'] = next_token
 54
 55            response = client.list_goals(**request_params)
 56
 57            batch = response.get('goals', [])
 58            goals.extend(batch)
 59
 60            print(f"   Found {len(batch)} goals in this batch")
 61
 62            next_token = response.get('nextToken')
 63            if not next_token:
 64                break
 65
 66        print(f"βœ… Total goals found: {len(goals)}")
 67
 68        # Sort goals by creation time
 69        goals.sort(key=lambda x: x['createdAt'])
 70
 71        return goals
 72
 73    except DevOpsAgentError as e:
 74        print(f"❌ Failed to list goals: {e}")
 75        return []
 76
 77
 78def create_task_for_goal(client, agent_space_id: str, goal: Dict[str, Any]) -> Optional[str]:
 79    """
 80    Create an evaluation task for a goal that doesn't have an associated task.
 81
 82    Returns the task_id of the created task, or None if creation failed.
 83    """
 84    goal_id = goal['goalId']
 85    goal_type = goal['goalType']
 86    goal_title = goal['title']
 87
 88    print(f"\nπŸ“‹ Creating evaluation task for goal: {goal_id[:8]}...")
 89    print("-" * 60)
 90
 91    try:
 92        # Create task with goal information in description
 93        task_description = json.dumps({
 94            "goal_id": goal_id,
 95            "goalType": goal_type
 96        })
 97
 98        response = client.create_task(
 99            agentSpaceId=agent_space_id,
100            clientToken=str(uuid.uuid4()),  # For idempotency
101            title=f"Evaluation task for Goal",
102            description=task_description,
103            taskType="EVALUATION",
104            priority="LOW"
105        )
106
107        task = response.get('task', {})
108        if task:
109            task_id = task['taskId']
110            print(f"βœ… Created evaluation task: {task_id[:8]}")
111            print(f"   Title: {task['title']}")
112            print(f"   Status: {task['status']}")
113            return task_id
114        else:
115            print("❌ Task creation failed - no task data returned")
116            return None
117
118    except DevOpsAgentError as e:
119        print(f"❌ Failed to create task for goal {goal_id[:8]}: {e}")
120        return None
121
122
123def get_task_for_goal(client, agent_space_id: str, goal: Dict[str, Any]) -> Optional[Task]:
124    """
125    Get the task associated with a goal, creating one if necessary.
126
127    Returns the task dict, or None if retrieval/creation failed.
128    """
129    goal_id = goal['goalId']
130    last_task_id = goal.get('lastTaskId')
131
132    if last_task_id:
133        print(f"\nπŸ”— Goal {goal_id[:8]} has existing task: {last_task_id[:8]}")
134        try:
135            response = client.get_task(
136                agentSpaceId=agent_space_id,
137                taskId=last_task_id
138            )
139            task = response.get('task', {})
140            if task:
141                print(f"   βœ… Retrieved existing task (Status: {task['status']})")
142                return task
143            else:
144                print("   ❌ Task not found, will create new one")
145        except DevOpsAgentError as e:
146            print(f"   ❌ Failed to get existing task: {e}")
147
148    # No existing task or failed to retrieve, create new one
149    task_id = create_task_for_goal(client, agent_space_id, goal)
150    if task_id:
151        try:
152            response = client.get_task(
153                agentSpaceId=agent_space_id,
154                taskId=task_id
155            )
156            task = response.get('task', {})
157            return task if task else None
158        except DevOpsAgentError as e:
159            print(f"❌ Failed to get newly created task: {e}")
160            return None
161    else:
162        return None
163
164
165def list_executions_for_task(client, agent_space_id: str, task_id: str) -> List[Execution]:
166    """List all executions for a specific task."""
167    print(f"\nπŸ”„ Listing executions for task: {task_id[:8]}...")
168    print("-" * 50)
169
170    executions = []
171    next_token = None
172
173    try:
174        while True:
175            # Build request parameters
176            request_params = {
177                'agentSpaceId': agent_space_id,
178                'taskId': task_id,
179                'limit': 50  # Get up to 50 executions at a time
180            }
181            if next_token:
182                request_params['nextToken'] = next_token
183
184            response = client.list_executions(**request_params)
185
186            batch = response.get('executions', [])
187            executions.extend(batch)
188
189            print(f"   Found {len(batch)} executions in this batch")
190
191            next_token = response.get('nextToken')
192            if not next_token:
193                break
194
195        print(f"βœ… Total executions found: {len(executions)}")
196
197        # Sort executions by creation time
198        executions.sort(key=lambda x: x['createdAt'])
199
200        return executions
201
202    except DevOpsAgentError as e:
203        print(f"❌ Failed to list executions: {e}")
204        return []
205
206
207def fetch_journal_records_for_execution(client, agent_space_id: str, execution_id: str) -> List[JournalRecord]:
208    """Fetch all journal records for a specific execution."""
209    records = []
210    next_token = None
211
212    try:
213        while True:
214            # Build request parameters
215            request_params = {
216                'agentSpaceId': agent_space_id,
217                'executionId': execution_id,
218                'limit': 100  # Get up to 100 records at a time
219            }
220            if next_token:
221                request_params['nextToken'] = next_token
222
223            response = client.get_journal_records(**request_params)
224
225            batch = response.get('records', [])
226            records.extend(batch)
227
228            next_token = response.get('nextToken')
229            if not next_token:
230                break
231
232        return records
233
234    except DevOpsAgentError as e:
235        print(f"   ❌ Failed to fetch records for execution {execution_id[:8]}: {e}")
236        return []
237
238
239def get_content_summary(record: JournalRecord) -> str:
240    """Extract a meaningful summary from journal record content."""
241    try:
242        # Use the record type from the top level
243        record_type = record.get('recordType', 'unknown')
244
245        # Parse the content JSON
246        data = json.loads(record['content'])
247
248        if record_type == 'message':
249            # Extract message content - content is a list of blocks
250            content_blocks = data.get('content', [])
251            role = data.get('role', 'unknown')
252            if content_blocks and isinstance(content_blocks, list):
253                for block in content_blocks:
254                    if isinstance(block, dict) and 'text' in block:
255                        text = block['text'][:60]
256                        return f"[{role}] {text}"
257                    elif isinstance(block, str):
258                        return f"[{role}] {block[:60]}"
259            # Try other possible message fields
260            if 'text' in data:
261                return f"[{role}] {data['text'][:60]}"
262            elif 'message' in data:
263                return f"[{role}] {data['message'][:60]}"
264            return f"[{role}] Message"
265
266        elif record_type == 'plan':
267            return f"Plan: {data.get('title', 'Untitled plan')}"
268
269        elif record_type == 'activity':
270            return f"Activity: {data.get('title', 'Untitled activity')} ({data.get('status', 'unknown')})"
271
272        elif record_type == 'observation':
273            return f"Observation: {data.get('title', 'Untitled observation')}"
274
275        elif record_type == 'symptom':
276            return f"Symptom: {data.get('title', 'Untitled symptom')}"
277
278        elif record_type == 'finding':
279            return f"Finding: {data.get('title', 'Untitled finding')}"
280
281        elif record_type == 'investigation_summary':
282            return f"Investigation Summary: {data.get('title', 'Summary')}"
283
284        elif record_type == 'mitigation_summary':
285            return f"Mitigation Summary: {data.get('title', 'Summary')}"
286
287        else:
288            # Generic fallback - show key fields
289            keys = list(data.keys())
290            if 'title' in data:
291                return f"{record_type}: {data['title']}"
292            elif 'description' in data:
293                return f"{record_type}: {data['description'][:50]}..."
294            elif keys:
295                return f"{record_type}: {keys[0]}={str(data[keys[0]])[:30]}..."
296            else:
297                return f"{record_type} record"
298
299    except Exception as e:
300        # Fallback: try to extract from raw JSON
301        try:
302            data = json.loads(record['content'])
303
304            # Handle different record types based on content
305            content_type = data.get('type', 'unknown')
306
307            if content_type == 'message':
308                content_blocks = data.get('content', [])
309                role = data.get('role', 'unknown')
310                if content_blocks and isinstance(content_blocks, list):
311                    for block in content_blocks:
312                        if isinstance(block, dict) and 'text' in block:
313                            text = block['text'][:60]
314                            return f"[{role}] {text}"
315                return f"[{role}] Message"
316
317            elif 'title' in data:
318                return f"{content_type}: {data['title']}"
319            elif 'description' in data:
320                return f"{content_type}: {data['description'][:50]}..."
321            else:
322                return f"{content_type} record"
323
324        except Exception as parse_error:
325            # Last resort: show raw content
326            raw_content = record['content'][:100] + "..." if len(record['content']) > 100 else record['content']
327            return f"Raw: {raw_content}"
328
329
330def format_timestamp(timestamp: float) -> str:
331    """Format Unix timestamp to human-readable string."""
332    dt = datetime.fromtimestamp(timestamp)
333    return dt.strftime('%Y-%m-%d %H:%M:%S')
334
335
336def display_goal_journal_timeline(goal_data: Dict[str, Any]):
337    """Display journal records organized by goal in a detailed timeline format."""
338    if not goal_data:
339        print("❌ No goal data found")
340        return
341
342    print(f"\n🎯 Goal Journal Timeline ({len(goal_data)} goals)")
343    print("=" * 120)
344
345    total_records = 0
346    total_executions = 0
347
348    for goal_info in goal_data:
349        goal = goal_info['goal']
350        task = goal_info.get('task')
351        executions = goal_info.get('executions', [])
352        records = goal_info.get('records', [])
353
354        goal_id = goal['goalId']
355        goal_title = goal['title']
356        goal_type = goal['goalType']
357        goal_status = goal['status']
358
359        print(f"\n🎯 GOAL: {goal_title}")
360        print(f"   ID: {goal_id} | Type: {goal_type} | Status: {goal_status}")
361        print(f"   Task: {task['taskId'][:8] if task else 'None'} | Executions: {len(executions)} | Records: {len(records)}")
362        print("-" * 110)
363
364        if not records:
365            print("   ℹ️  No journal records found for this goal")
366            continue
367
368        total_records += len(records)
369        total_executions += len(executions)
370
371        # Display records for this goal
372        for i, record in enumerate(records, 1):
373            timestamp = format_timestamp(record['createdAt'])
374            execution_id = record['executionId']
375            record_type = record['recordType']
376            record_id = record.get('recordId', 'N/A')
377
378            print(f"\n   πŸ“ Record {i:2d} | {timestamp}")
379            print(f"   πŸ”— ID: {record_id} | βš™οΈ Exec: {execution_id}")
380            print(f"   πŸ“‹ Type: {record_type}")
381
382            # Show content summary
383            summary = get_content_summary(record)
384            print(f"   πŸ’¬ {summary}")
385
386        print()
387
388    print("=" * 120)
389    print("πŸ“Š SUMMARY:")
390    print(f"   Goals processed: {len(goal_data)}")
391    print(f"   Total executions: {total_executions}")
392    print(f"   Total journal records: {total_records}")
393
394
395def interactive_goal_selection(client, agent_space_id):
396    """Allow user to select goals interactively for demonstration."""
397    print("\n🎯 Goal Selection for Journal Timeline")
398    print("-" * 42)
399
400    try:
401        # Get recent goals
402        response = client.list_goals(agentSpaceId=agent_space_id, limit=10)
403
404        goals = response.get('goals', [])
405        if not goals:
406            print("❌ No goals found in this agent space")
407            print("   Please create some goals first, or check your agent space ID")
408            return []
409
410        print("Available goals:")
411        for i, goal in enumerate(goals, 1):
412            created = datetime.fromisoformat(goal['createdAt'].replace('Z', '+00:00'))
413            task_status = "Has Task" if goal.get('lastTaskId') else "No Task"
414            print(f"   {i}. {goal['title'][:40]}... ({goal['goalType']}, {goal['status']}) - {task_status}")
415
416        # For demo purposes, process all goals
417        print(f"\nπŸ“Œ Processing all {len(goals)} goals for journal timeline...")
418        return goals
419
420    except DevOpsAgentError as e:
421        print(f"❌ Failed to list goals: {e}")
422        return []
423
424
425def main():
426    """Main function to fetch and display goal journal timeline."""
427    print("🎯 Community DevOps Agent API - Goal Journal Timeline")
428    print("=" * 65)
429
430    # Get configuration
431    config = get_config()
432    config.print_configuration()
433
434    # Check service registration
435    if not devopsagent_api.is_service_registered():
436        print("❌ Service registration failed")
437        return
438
439    # Get agent space ID
440    agent_space_id = config.agent_space_id or get_agent_space_id()
441    print(f"Using agent space: {agent_space_id[:8]}...")
442
443    # Create client
444    try:
445        client = config.get_client()
446        print("βœ… Client created successfully")
447    except Exception as e:
448        print(f"❌ Failed to create client: {e}")
449        return
450
451    # Step 1: List all goals
452    goals = interactive_goal_selection(client, agent_space_id)
453    if not goals:
454        print("\n❌ Cannot continue without goals to work with")
455        return
456
457    # Step 2: Process each goal - get/create task, get executions, get journal records
458    goal_data = []
459
460    for i, goal in enumerate(goals, 1):
461        goal_id = goal['goalId']
462        goal_title = goal['title']
463
464        print(f"\n{'='*80}")
465        print(f"🎯 Processing Goal {i}/{len(goals)}: {goal_title[:50]}...")
466        print(f"   Goal ID: {goal_id}")
467        print(f"{'='*80}")
468
469        # Get or create task for this goal
470        task = get_task_for_goal(client, agent_space_id, goal)
471        if not task:
472            print(f"❌ Skipping goal {goal_id[:8]} - no task available")
473            continue
474
475        # Get executions for this task
476        executions = list_executions_for_task(client, agent_space_id, task['taskId'])
477        if not executions:
478            print(f"ℹ️  No executions found for task {task['taskId'][:8]}")
479            goal_data.append({
480                'goal': goal,
481                'task': task,
482                'executions': [],
483                'records': []
484            })
485            continue
486
487        # Fetch journal records for each execution
488        print(f"\nπŸ“ Fetching journal records for {len(executions)} executions...")
489        print("-" * 65)
490
491        all_records = []
492        for j, execution in enumerate(executions, 1):
493            exec_id = execution['executionId']
494            print(f"   [{j}/{len(executions)}] Processing execution: {exec_id[:8]}...")
495
496            records = fetch_journal_records_for_execution(client, agent_space_id, exec_id)
497
498            # Add execution ID to each record for timeline context
499            for record in records:
500                record_with_exec = dict(record)
501                record_with_exec['executionId'] = exec_id
502                all_records.append(record_with_exec)
503
504            print(f"      Found {len(records)} journal records")
505
506        # Sort all records for this goal chronologically
507        all_records.sort(key=lambda x: x['createdAt'])
508
509        goal_data.append({
510            'goal': goal,
511            'task': task,
512            'executions': executions,
513            'records': all_records
514        })
515
516        print(f"   βœ… Goal {goal_id[:8]} processed: {len(executions)} executions, {len(all_records)} records")
517
518    # Step 3: Display comprehensive timeline
519    display_goal_journal_timeline(goal_data)
520
521    print("\nπŸŽ‰ Goal journal timeline generation complete!")
522    print("\nπŸ’‘ Key Takeaways:")
523    print("β€’ Goals can have associated evaluation tasks via lastTaskId")
524    print("β€’ Tasks are automatically created for goals that need evaluation")
525    print("β€’ Journal records show the complete execution history for each goal")
526    print("β€’ Timeline provides chronological view of goal progress")
527    print("β€’ Different goal types (ONCALL_REPORT, CUSTOMER_DEFINED, PREDEFINED) have different workflows")
528
529    print("\nπŸ“– Record Types Legend:")
530    print("β€’ message: User/assistant interactions")
531    print("β€’ plan: Planned activities")
532    print("β€’ activity: Task execution steps")
533    print("β€’ observation: System observations")
534    print("β€’ symptom: Identified issues")
535    print("β€’ finding: Analysis results")
536    print("β€’ investigation_summary: Final summary")
537
538
539if __name__ == "__main__":
540    main()

Journal Timeline

  1#!/usr/bin/env python3
  2"""
  3Journal Records Timeline - Fetch and display journal records from all executions.
  4
  5This script allows interactive selection of a task, retrieves all executions for that task,
  6fetches journal records from each execution, and displays them in a chronological timeline.
  7"""
  8
  9import os
 10import sys
 11import json
 12from datetime import datetime
 13from typing import List, Dict, Any
 14import boto3
 15import devopsagent_api
 16from devopsagent_api.models import JournalRecord, Execution, parse_journal_content
 17from devopsagent_api.exceptions import DevOpsAgentError
 18from config import get_config
 19
 20
 21def get_agent_space_id():
 22    """Get agent space ID from environment or prompt user."""
 23    agent_space_id = os.getenv('AGENT_SPACE_ID')
 24    if not agent_space_id:
 25        print("❌ AGENT_SPACE_ID environment variable not set")
 26        print("   Please set it with: export AGENT_SPACE_ID=your-agent-space-uuid")
 27        print("   Or run: AGENT_SPACE_ID=your-uuid python examples/journal_timeline.py")
 28        sys.exit(1)
 29    return agent_space_id
 30
 31
 32def list_executions_for_task(client, agent_space_id: str, task_id: str) -> List[Execution]:
 33    """List all executions for a specific task."""
 34    print(f"\nπŸ” Listing executions for task: {task_id[:8]}...")
 35    print("-" * 50)
 36
 37    executions = []
 38    next_token = None
 39
 40    try:
 41        while True:
 42            # Build request parameters
 43            request_params = {
 44                'agentSpaceId': agent_space_id,
 45                'taskId': task_id,
 46                'limit': 50  # Get up to 50 executions at a time
 47            }
 48            if next_token:
 49                request_params['nextToken'] = next_token
 50
 51            response = client.list_executions(**request_params)
 52
 53            batch = response.get('executions', [])
 54            executions.extend(batch)
 55
 56            print(f"   Found {len(batch)} executions in this batch")
 57
 58            next_token = response.get('nextToken')
 59            if not next_token:
 60                break
 61
 62        print(f"βœ… Total executions found: {len(executions)}")
 63
 64        # Sort executions by creation time
 65        executions.sort(key=lambda x: x['createdAt'])
 66
 67        return executions
 68
 69    except DevOpsAgentError as e:
 70        print(f"❌ Failed to list executions: {e}")
 71        return []
 72
 73
 74def fetch_journal_records_for_execution(client, agent_space_id: str, execution_id: str) -> List[JournalRecord]:
 75    """Fetch all journal records for a specific execution."""
 76    records = []
 77    next_token = None
 78
 79    try:
 80        while True:
 81            # Build request parameters
 82            request_params = {
 83                'agentSpaceId': agent_space_id,
 84                'executionId': execution_id,
 85                'limit': 100  # Get up to 100 records at a time
 86            }
 87            if next_token:
 88                request_params['nextToken'] = next_token
 89
 90            response = client.get_journal_records(**request_params)
 91
 92            batch = response.get('records', [])
 93            records.extend(batch)
 94
 95            next_token = response.get('nextToken')
 96            if not next_token:
 97                break
 98
 99        return records
100
101    except DevOpsAgentError as e:
102        print(f"   ❌ Failed to fetch records for execution {execution_id[:8]}: {e}")
103        return []
104
105
106def get_content_summary(record: JournalRecord) -> str:
107    """Extract a meaningful summary from journal record content."""
108    try:
109        # Use the record type from the top level
110        record_type = record.get('recordType', 'unknown')
111
112        # Parse the content JSON
113        data = json.loads(record['content'])
114
115        if record_type == 'message':
116            # Extract message content - content is a list of blocks
117            content_blocks = data.get('content', [])
118            role = data.get('role', 'unknown')
119            if content_blocks and isinstance(content_blocks, list):
120                for block in content_blocks:
121                    if isinstance(block, dict) and 'text' in block:
122                        text = block['text'][:60]
123                        return f"[{role}] {text}"
124                    elif isinstance(block, str):
125                        return f"[{role}] {block[:60]}"
126            # Try other possible message fields
127            if 'text' in data:
128                return f"[{role}] {data['text'][:60]}"
129            elif 'message' in data:
130                return f"[{role}] {data['message'][:60]}"
131            return f"[{role}] Message"
132
133        elif record_type == 'plan':
134            return f"Plan: {data.get('title', 'Untitled plan')}"
135
136        elif record_type == 'activity':
137            return f"Activity: {data.get('title', 'Untitled activity')} ({data.get('status', 'unknown')})"
138
139        elif record_type == 'observation':
140            return f"Observation: {data.get('title', 'Untitled observation')}"
141
142        elif record_type == 'symptom':
143            return f"Symptom: {data.get('title', 'Untitled symptom')}"
144
145        elif record_type == 'finding':
146            return f"Finding: {data.get('title', 'Untitled finding')}"
147
148        elif record_type == 'investigation_summary':
149            return f"Investigation Summary: {data.get('title', 'Summary')}"
150
151        elif record_type == 'mitigation_summary':
152            return f"Mitigation Summary: {data.get('title', 'Summary')}"
153
154        else:
155            # Generic fallback - show key fields
156            keys = list(data.keys())
157            if 'title' in data:
158                return f"{record_type}: {data['title']}"
159            elif 'description' in data:
160                return f"{record_type}: {data['description'][:50]}..."
161            elif keys:
162                return f"{record_type}: {keys[0]}={str(data[keys[0]])[:30]}..."
163            else:
164                return f"{record_type} record"
165
166    except Exception as e:
167        # Fallback: try to extract from raw JSON
168        try:
169            data = json.loads(record['content'])
170
171            # Handle different record types based on content
172            content_type = data.get('type', 'unknown')
173
174            if content_type == 'message':
175                content_blocks = data.get('content', [])
176                role = data.get('role', 'unknown')
177                if content_blocks and isinstance(content_blocks, list):
178                    for block in content_blocks:
179                        if isinstance(block, dict) and 'text' in block:
180                            text = block['text'][:60]
181                            return f"[{role}] {text}"
182                return f"[{role}] Message"
183
184            elif 'title' in data:
185                return f"{content_type}: {data['title']}"
186            elif 'description' in data:
187                return f"{content_type}: {data['description'][:50]}..."
188            else:
189                return f"{content_type} record"
190
191        except Exception as parse_error:
192            # Last resort: show raw content
193            raw_content = record['content'][:100] + "..." if len(record['content']) > 100 else record['content']
194            return f"Raw: {raw_content}"
195
196
197def format_timestamp(timestamp: float) -> str:
198    """Format Unix timestamp to human-readable string."""
199    dt = datetime.fromtimestamp(timestamp)
200    return dt.strftime('%Y-%m-%d %H:%M:%S')
201
202
203def get_detailed_content(record: JournalRecord) -> str:
204    """Extract detailed content from journal record for rich display."""
205    try:
206        record_type = record.get('recordType', 'unknown')
207        data = json.loads(record['content'])
208
209        if record_type == 'message':
210            role = data.get('role', 'unknown')
211            content_blocks = data.get('content', [])
212            user_ref = data.get('user_reference')
213
214            result = f"Role: {role}\n"
215            if user_ref:
216                result += f"User Reference: {user_ref}\n"
217
218            result += "Content:\n"
219            if content_blocks and isinstance(content_blocks, list):
220                has_content = False
221                for i, block in enumerate(content_blocks):
222                    if isinstance(block, dict) and 'text' in block and block['text'].strip():
223                        result += f"  [{i+1}] {block['text']}\n"
224                        has_content = True
225                    elif isinstance(block, str) and block.strip():
226                        result += f"  [{i+1}] {block}\n"
227                        has_content = True
228
229                if not has_content:
230                    # If no text content, show the full JSON structure
231                    result += "  (JSON payload):\n"
232                    result += json.dumps(data, indent=4) + "\n"
233            else:
234                # No content blocks, show full JSON
235                result += "  (JSON payload):\n"
236                result += json.dumps(data, indent=4) + "\n"
237
238            return result.rstrip()
239
240        else:
241            # For non-message records, show structured data
242            result = ""
243            for key, value in data.items():
244                if key == 'content' and isinstance(value, list):
245                    result += f"{key}:\n"
246                    for i, item in enumerate(value):
247                        result += f"  [{i+1}] {item}\n"
248                elif isinstance(value, (dict, list)):
249                    result += f"{key}:\n{json.dumps(value, indent=4)}\n"
250                else:
251                    result += f"{key}: {value}\n"
252            return result.rstrip()
253
254    except Exception as e:
255        # Fallback to raw JSON with formatting
256        try:
257            data = json.loads(record['content'])
258            return json.dumps(data, indent=2)
259        except:
260            return record['content']
261
262
263def display_detailed_timeline(all_records: List[Dict[str, Any]]):
264    """Display journal records in a detailed, multi-line format."""
265    if not all_records:
266        print("❌ No journal records found")
267        return
268
269    print(f"\nπŸ“œ Detailed Journal Records Timeline ({len(all_records)} total records)")
270    print("=" * 120)
271
272    for i, record in enumerate(all_records, 1):
273        timestamp = format_timestamp(record['createdAt'])
274        execution_id = record['executionId']
275        record_type = record['recordType']
276        record_id = record.get('recordId', 'N/A')
277        agent_space_id = record.get('agentSpaceId', 'N/A')
278
279        # Compact header with all IDs
280        print(f"\n{'='*110}")
281        print(f"πŸ“ Record {i:2d} | {timestamp}")
282        print(f"πŸ”— ID: {record_id} | 🏒 Space: {agent_space_id} | βš™οΈ Exec: {execution_id}")
283        print(f"πŸ“‹ Type: {record_type}")
284        print(f"{'='*110}")
285
286        # Display detailed content
287        detailed_content = get_detailed_content(record)
288        print("πŸ“„ Content:")
289        # Split content into lines and indent
290        for line in detailed_content.split('\n'):
291            if line.strip():  # Only print non-empty lines
292                print(f"   {line}")
293
294        print()
295
296    print("=" * 120)
297
298
299def interactive_task_selection(client, agent_space_id):
300    """Allow user to select a task interactively for journal timeline."""
301    print("\n🎯 Task Selection for Journal Timeline")
302    print("-" * 42)
303
304    try:
305        # Get recent tasks
306        response = client.list_tasks(
307            agentSpaceId=agent_space_id,
308            limit=10,
309            sortField='CREATED_AT',
310            order='DESC'
311        )
312
313        tasks = response.get('tasks', [])
314        if not tasks:
315            print("❌ No tasks found in this agent space")
316            print("   Please create some tasks first, or check your agent space ID")
317            return None
318
319        print("Available tasks (recently created):")
320        for i, task in enumerate(tasks, 1):
321            created = datetime.fromisoformat(task['createdAt'].replace('Z', '+00:00'))
322            print(f"   {i}. {task['title'][:50]}... ({task['status']}, {task['taskType']}) - {created.strftime('%Y-%m-%d %H:%M')}")
323
324        # Let user choose
325        while True:
326            try:
327                choice = input(f"\nSelect a task (1-{len(tasks)}) or press Enter for the most recent: ").strip()
328                if not choice:
329                    selected_task = tasks[0]
330                    print(f"πŸ“Œ Using most recent task: {selected_task['title'][:40]}...")
331                    break
332                else:
333                    choice_idx = int(choice) - 1
334                    if 0 <= choice_idx < len(tasks):
335                        selected_task = tasks[choice_idx]
336                        print(f"πŸ“Œ Selected task: {selected_task['title'][:40]}...")
337                        break
338                    else:
339                        print(f"❌ Invalid choice. Please enter 1-{len(tasks)} or press Enter.")
340            except ValueError:
341                print("❌ Invalid input. Please enter a number or press Enter.")
342
343        return selected_task['taskId']
344
345    except DevOpsAgentError as e:
346        print(f"❌ Failed to list tasks: {e}")
347        return None
348
349
350def main():
351    """Main function to fetch and display journal timeline."""
352    print("πŸ“œ Community DevOps Agent API - Journal Records Timeline")
353    print("=" * 65)
354
355    # Get configuration
356    config = get_config()
357    config.print_configuration()
358
359    # Check service registration
360    if not devopsagent_api.is_service_registered():
361        print("❌ Service registration failed")
362        return
363
364    # Get agent space ID
365    agent_space_id = config.agent_space_id or get_agent_space_id()
366    print(f"Using agent space: {agent_space_id[:8]}...")
367
368    # Create client
369    try:
370        client = config.get_client()
371        print("βœ… Client created successfully")
372    except Exception as e:
373        print(f"❌ Failed to create client: {e}")
374        return
375
376    # Select a task interactively
377    task_id = interactive_task_selection(client, agent_space_id)
378    if not task_id:
379        print("\n❌ Cannot continue without a task to work with")
380        return
381
382    print(f"Fetching journal timeline for selected task...")
383
384    # Step 1: List all executions for the task
385    executions = list_executions_for_task(client, agent_space_id, task_id)
386
387    if not executions:
388        print("❌ No executions found for this task")
389        return
390
391    # Step 2: Fetch journal records for each execution
392    print("\nπŸ“ Fetching journal records for all executions...")
393    print("-" * 55)
394
395    all_records = []
396    for i, execution in enumerate(executions, 1):
397        exec_id = execution['executionId']
398        print(f"   [{i}/{len(executions)}] Processing execution: {exec_id[:8]}...")
399
400        records = fetch_journal_records_for_execution(client, agent_space_id, exec_id)
401
402        # Add execution ID to each record for timeline context
403        for record in records:
404            record_with_exec = dict(record)
405            record_with_exec['executionId'] = exec_id
406            all_records.append(record_with_exec)
407
408        print(f"      Found {len(records)} journal records")
409
410    # Step 3: Sort all records chronologically
411    print(f"\nπŸ”„ Sorting {len(all_records)} total records chronologically...")
412    all_records.sort(key=lambda x: x['createdAt'])
413
414    # Step 4: Display detailed timeline
415    display_detailed_timeline(all_records)
416
417    print("\nπŸŽ‰ Journal timeline generation complete!")
418    print(f"\nπŸ’‘ Summary:")
419    print(f"β€’ Task ID: {task_id}")
420    print(f"β€’ Total executions: {len(executions)}")
421    print(f"β€’ Total journal records: {len(all_records)}")
422
423    if executions:
424        print(f"β€’ Execution IDs: {', '.join([e['executionId'][:8] + '...' for e in executions])}")
425
426    print("\nπŸ“– Record Types Legend:")
427    print("β€’ message: User/assistant interactions")
428    print("β€’ plan: Planned activities")
429    print("β€’ activity: Task execution steps")
430    print("β€’ observation: System observations")
431    print("β€’ symptom: Identified issues")
432    print("β€’ finding: Analysis results")
433    print("β€’ investigation_summary: Final summary")
434
435
436if __name__ == "__main__":
437    main()

Configuration

  1#!/usr/bin/env python3
  2"""
  3Configuration utilities for DevOps Agent API examples.
  4
  5This module provides centralized configuration management for examples,
  6including environment variable handling, service metadata discovery,
  7and sensible defaults.
  8"""
  9
 10import os
 11import boto3
 12import devopsagent_api
 13from typing import Dict, List, Optional, Any
 14
 15
 16class ExampleConfig:
 17    """Configuration manager for examples."""
 18
 19    def __init__(self):
 20        """Initialize configuration with environment variables and defaults."""
 21        self._region = os.getenv('AWS_REGION', 'us-east-1')
 22        self._agent_space_id = os.getenv('AGENT_SPACE_ID')
 23        self._user_id = os.getenv('USER_ID', 'example-user')
 24        self._poll_interval = int(os.getenv('POLL_INTERVAL', '2'))
 25        self._default_limit = int(os.getenv('DEFAULT_LIMIT', '10'))
 26        self._timeout_seconds = int(os.getenv('TIMEOUT_SECONDS', '60'))
 27        self._max_checks = int(os.getenv('MAX_CHECKS', '10'))
 28
 29        # Cached service metadata
 30        self._waiters = None
 31        self._paginators = None
 32        self._operations = None
 33
 34    @property
 35    def region(self) -> str:
 36        """Get AWS region."""
 37        return self._region
 38
 39    @property
 40    def agent_space_id(self) -> Optional[str]:
 41        """Get agent space ID."""
 42        return self._agent_space_id
 43
 44    @property
 45    def user_id(self) -> str:
 46        """Get user ID."""
 47        return self._user_id
 48
 49    @property
 50    def poll_interval(self) -> int:
 51        """Get polling interval in seconds."""
 52        return self._poll_interval
 53
 54    @property
 55    def default_limit(self) -> int:
 56        """Get default pagination limit."""
 57        return self._default_limit
 58
 59    @property
 60    def timeout_seconds(self) -> int:
 61        """Get default timeout in seconds."""
 62        return self._timeout_seconds
 63
 64    @property
 65    def max_checks(self) -> int:
 66        """Get maximum number of status checks."""
 67        return self._max_checks
 68
 69    def get_client(self) -> boto3.client:
 70        """Create and return a configured boto3 client."""
 71        return boto3.client('community-devops-agent', region_name=self.region)
 72
 73    def get_available_waiters(self) -> List[str]:
 74        """Get list of available waiters from service metadata."""
 75        if self._waiters is None:
 76            try:
 77                client = self.get_client()
 78                # Try to get waiter names by attempting to create them
 79                potential_waiters = [
 80                    'task_completed', 'task_failed', 'task_started',
 81                    'execution_completed', 'execution_failed'
 82                ]
 83                available = []
 84                for waiter_name in potential_waiters:
 85                    try:
 86                        client.get_waiter(waiter_name)
 87                        available.append(waiter_name)
 88                    except Exception:
 89                        continue
 90                self._waiters = available
 91            except Exception:
 92                # Fallback to known waiters
 93                self._waiters = ['task_completed', 'task_failed', 'task_started']
 94
 95        return self._waiters
 96
 97    def get_available_paginators(self) -> List[str]:
 98        """Get list of available paginators from service metadata."""
 99        if self._paginators is None:
100            try:
101                client = self.get_client()
102                potential_paginators = [
103                    'list_tasks', 'list_executions', 'get_journal_records',
104                    'list_goals', 'list_recommendations'
105                ]
106                available = []
107                for paginator_name in potential_paginators:
108                    try:
109                        client.get_paginator(paginator_name)
110                        available.append(paginator_name)
111                    except Exception:
112                        continue
113                self._paginators = available
114            except Exception:
115                # Fallback to known paginators
116                self._paginators = ['list_tasks', 'list_executions', 'get_journal_records']
117
118        return self._paginators
119
120    def get_operations_by_category(self) -> Dict[str, List[str]]:
121        """Get operations grouped by category."""
122        if self._operations is None:
123            try:
124                client = self.get_client()
125                operations = client.meta.service_model.operation_names
126
127                # Group operations by category
128                task_ops = [op for op in operations if 'task' in op.lower()]
129                journal_ops = [op for op in operations if any(x in op.lower() for x in ['journal', 'execution'])]
130                goal_ops = [op for op in operations if any(x in op.lower() for x in ['goal', 'recommendation'])]
131                other_ops = [op for op in operations if op not in task_ops + journal_ops + goal_ops]
132
133                self._operations = {
134                    'Task Management': sorted(task_ops),
135                    'Journal & Execution': sorted(journal_ops),
136                    'Goals & Recommendations': sorted(goal_ops),
137                    'Topology & Support': sorted(other_ops)
138                }
139            except Exception:
140                # Fallback to known operations
141                self._operations = {
142                    'Task Management': ['CreateTask', 'GetTask', 'ListTasks', 'UpdateTask'],
143                    'Journal & Execution': ['GetJournalRecords', 'ListExecutions'],
144                    'Goals & Recommendations': ['ListGoals', 'ListRecommendations', 'GetRecommendation', 'UpdateRecommendation'],
145                    'Topology & Support': ['QueryTopology', 'GetSupportLevel', 'CreateSupportChat', 'EndSupportChat']
146                }
147
148        return self._operations
149
150    def validate_configuration(self) -> List[str]:
151        """Validate current configuration and return any issues."""
152        issues = []
153
154        if not self.agent_space_id:
155            issues.append("AGENT_SPACE_ID environment variable not set (required for most operations)")
156
157        if not devopsagent_api.is_service_registered():
158            issues.append("Service registration failed - check dependencies")
159
160        return issues
161
162    def print_configuration(self):
163        """Print current configuration for debugging."""
164        print("πŸ”§ Example Configuration")
165        print("-" * 30)
166        print(f"AWS Region: {self.region}")
167        print(f"Agent Space ID: {self.agent_space_id or 'Not set'}")
168        print(f"User ID: {self.user_id}")
169        print(f"Poll Interval: {self.poll_interval}s")
170        print(f"Default Limit: {self.default_limit}")
171        print(f"Timeout: {self.timeout_seconds}s")
172        print(f"Max Checks: {self.max_checks}")
173
174        issues = self.validate_configuration()
175        if issues:
176            print("\n⚠️  Configuration Issues:")
177            for issue in issues:
178                print(f"  β€’ {issue}")
179        else:
180            print("\nβœ… Configuration looks good")
181
182
183# Global configuration instance
184config = ExampleConfig()
185
186
187def get_config() -> ExampleConfig:
188    """Get the global configuration instance."""
189    return config

Running Examples

All examples can be run directly:

# Basic usage
python examples/basic_usage.py

# List tasks
python examples/list_tasks.py

# Interactive monitoring
python examples/interactive_monitoring_enhanced.py

Make sure to set your AGENT_SPACE_ID environment variable before running examples that require it.

Example Output

Here’s what you can expect from running the basic usage example:

Configuration:
- Agent Space ID: your-agent-space-uuid
- AWS Region: us-east-1
- User ID: 4be32b4a-9675-4dc0-97ff-7126ad28457c
- Poll Interval: 2s
- Default Limit: 10
- Timeout: 60s
- Max Checks: 10

Service registered successfully
Client created successfully

Listing tasks...
Found 3 tasks

Task 1:
- ID: task-123
- Title: Deploy application
- Status: COMPLETED
- Created: 2025-12-09T10:30:00Z

Task 2:
- ID: task-456
- Title: Run tests
- Status: IN_PROGRESS
- Created: 2025-12-09T11:15:00Z

Task 3:
- ID: task-789
- Title: Update documentation
- Status: PENDING
- Created: 2025-12-09T12:00:00Z