Examplesο
This section contains complete examples demonstrating how to use the Community DevOps Agent API.
Basic Usageο
1#!/usr/bin/env python3
2"""
3Basic usage examples for the Community DevOps Agent API.
4
5This script demonstrates fundamental usage patterns for the boto3 integration,
6including service registration, client creation, and basic operations.
7"""
8
9import boto3
10import devopsagent_api
11from devopsagent_api.exceptions import DevOpsAgentError
12from config import get_config
13
14
15def main():
16 """Demonstrate basic usage of the DevOps Agent API."""
17
18 print("π€ Community DevOps Agent API - Basic Usage Examples")
19 print("=" * 60)
20
21 # Get configuration
22 config = get_config()
23 config.print_configuration()
24
25 # Check service registration status
26 print("\n1. Service Registration Status")
27 print("-" * 30)
28 if devopsagent_api.is_service_registered():
29 print("β
Service successfully registered with boto3")
30 else:
31 print("β Service registration failed")
32 print(" This may be due to missing dependencies or configuration issues")
33 return
34
35 # Create a client
36 print("\n2. Client Creation")
37 print("-" * 30)
38 try:
39 # Basic client creation using config
40 client = config.get_client()
41 print("β
Client created successfully")
42 print(f" Service name: {client.meta.service_model.service_name}")
43 print(f" API version: {client.meta.service_model.api_version}")
44 print(f" Endpoint: {client._endpoint.host}")
45
46 except Exception as e:
47 print(f"β Failed to create client: {e}")
48 return
49
50 # Demonstrate available operations
51 print("\n3. Available Operations")
52 print("-" * 30)
53 operations_by_category = config.get_operations_by_category()
54 total_ops = sum(len(ops) for ops in operations_by_category.values())
55 print(f"Total operations available: {total_ops}")
56
57 for category, operations in operations_by_category.items():
58 print(f"\n{category}:")
59 for op in operations:
60 print(f" β’ {op}")
61
62 # Demonstrate paginators
63 print("\n4. Available Paginators")
64 print("-" * 30)
65 try:
66 available_paginators = config.get_available_paginators()
67 print("Automatic pagination available for:")
68 for name in available_paginators:
69 try:
70 paginator = client.get_paginator(name)
71 print(f" β’ {name} (page size: 50-1000)")
72 except Exception:
73 print(f" β’ {name} (not available)")
74 except Exception as e:
75 print(f"β Error checking paginators: {e}")
76
77 # Demonstrate waiters
78 print("\n5. Available Waiters")
79 print("-" * 30)
80 try:
81 available_waiters = config.get_available_waiters()
82 print("Long-running operation waiters:")
83 for name in available_waiters:
84 try:
85 waiter = client.get_waiter(name)
86 print(f" β’ {name} (polling with backoff)")
87 except Exception:
88 print(f" β’ {name} (not available)")
89 except Exception as e:
90 print(f"β Error checking waiters: {e}")
91
92 # Demonstrate configuration options
93 print("\n6. Client Configuration Options")
94 print("-" * 30)
95 from botocore.config import Config
96
97 # Example configurations
98 configs = [
99 ("Default", {}),
100 ("With retries", {"retries": {"max_attempts": 5}}),
101 ("With timeouts", {"connect_timeout": 10, "read_timeout": 30}),
102 ("With retry mode", {"retries": {"mode": "adaptive"}}),
103 ]
104
105 for name, config_dict in configs:
106 try:
107 boto_config = Config(**config_dict)
108 test_client = boto3.client(
109 'community-devops-agent',
110 region_name=config.region,
111 config=boto_config
112 )
113 print(f" β’ {name}: β
Configured successfully")
114 except Exception as e:
115 print(f" β’ {name}: β Configuration failed: {e}")
116
117 # Demonstrate session usage
118 print("\n7. Using boto3 Sessions")
119 print("-" * 30)
120 try:
121 # Create a session (useful for credential management)
122 session = boto3.Session(region_name=config.region)
123
124 # Create client from session
125 session_client = session.client('community-devops-agent')
126 print("β
Session-based client created successfully")
127 print(f" Region: {session.region_name}")
128 print(f" Profile: {session.profile_name or 'default'}")
129
130 except Exception as e:
131 print(f"β Session client creation failed: {e}")
132
133 # Error handling demonstration
134 print("\n8. Error Handling")
135 print("-" * 30)
136 print("The library provides comprehensive error handling:")
137 print("β’ DevOpsAgentError: Base exception for API errors")
138 print("β’ AuthenticationError: Credential and auth issues")
139 print("β’ ValidationException: Invalid request parameters")
140 print("β’ ResourceNotFoundException: Missing resources")
141 print("β’ ThrottlingException: Rate limiting")
142 print("β’ And more... (see devopsagent_api.exceptions)")
143
144 # Demonstrate import patterns
145 print("\n9. Import Patterns")
146 print("-" * 30)
147 print("Recommended import pattern:")
148 print(" import devopsagent_api # Registers service")
149 print(" import boto3")
150 print(" client = boto3.client('community-devops-agent')")
151 print()
152 print("For type hints and models:")
153 print(" from devopsagent_api.models import Task, TaskStatus")
154 print(" from devopsagent_api.exceptions import DevOpsAgentError")
155
156 print("\nπ Basic usage demonstration complete!")
157 print("\nNext steps:")
158 print("β’ Run 'python examples/list_tasks.py' for task operations")
159 print("β’ Check examples/README.md for detailed usage patterns")
160 print("β’ See implementationPlan.md for development roadmap")
161
162
163if __name__ == "__main__":
164 main()
List Tasksο
1#!/usr/bin/env python3
2"""
3Task listing examples for the Community DevOps Agent API.
4
5This script demonstrates how to list and filter tasks using the boto3 integration,
6including pagination, sorting, and various filter combinations.
7"""
8
9import os
10import sys
11import boto3
12import devopsagent_api
13from devopsagent_api.models import TaskStatus, TaskType, TaskPriority, SortField, SortOrder
14from devopsagent_api.exceptions import DevOpsAgentError
15from datetime import datetime, timedelta
16from config import get_config
17
18
19def get_agent_space_id():
20 """Get agent space ID from environment or prompt user."""
21 agent_space_id = os.getenv('AGENT_SPACE_ID')
22 if not agent_space_id:
23 print("β AGENT_SPACE_ID environment variable not set")
24 print(" Please set it with: export AGENT_SPACE_ID=your-agent-space-uuid")
25 print(" Or run: AGENT_SPACE_ID=your-uuid python examples/list_tasks.py")
26 sys.exit(1)
27 return agent_space_id
28
29
30def list_all_tasks(client, agent_space_id):
31 """Demonstrate basic task listing without filters."""
32 print("\nπ Basic Task Listing")
33 print("-" * 30)
34
35 try:
36 response = client.list_tasks(agentSpaceId=agent_space_id)
37
38 tasks = response.get('tasks', [])
39 print(f"Found {len(tasks)} tasks")
40
41 if tasks:
42 for i, task in enumerate(tasks[:5], 1): # Show first 5
43 print(f"{i}. {task['title'][:50]}... ({task['status']})")
44 if len(tasks) > 5:
45 print(f" ... and {len(tasks) - 5} more tasks")
46 else:
47 print(" No tasks found in this agent space")
48
49 return tasks
50
51 except DevOpsAgentError as e:
52 print(f"β API Error: {e}")
53 return []
54 except Exception as e:
55 print(f"β Unexpected error: {e}")
56 return []
57
58
59def list_tasks_with_filters(client, agent_space_id):
60 """Demonstrate task listing with various filters."""
61 print("\nπ Task Listing with Filters")
62 print("-" * 35)
63
64 # Filter for completed investigation tasks
65 print("\n1. Completed Investigation Tasks")
66 try:
67 response = client.list_tasks(
68 agentSpaceId=agent_space_id,
69 filter={
70 'taskType': ['INVESTIGATION'],
71 'status': ['COMPLETED']
72 },
73 limit=10
74 )
75 tasks = response.get('tasks', [])
76 print(f" Found {len(tasks)} completed investigation tasks")
77 for task in tasks:
78 print(f" β’ {task['title'][:40]}... (Priority: {task['priority']})")
79
80 except DevOpsAgentError as e:
81 print(f" β Filter error: {e}")
82
83 # Filter for high-priority tasks
84 print("\n2. High Priority Tasks")
85 try:
86 response = client.list_tasks(
87 agentSpaceId=agent_space_id,
88 filter={
89 'priority': ['HIGH'] # Single item list
90 },
91 limit=5
92 )
93 tasks = response.get('tasks', [])
94 print(f" Found {len(tasks)} high-priority tasks")
95 for task in tasks:
96 print(f" β’ {task['title'][:40]}... ({task['priority']}, {task['status']})")
97
98 except DevOpsAgentError as e:
99 print(f" β Filter error: {e}")
100
101 # Filter by date range (last 7 days)
102 print("\n3. Recently Created Tasks (Last 7 Days)")
103 try:
104 seven_days_ago = (datetime.utcnow() - timedelta(days=7)).isoformat() + 'Z'
105 response = client.list_tasks(
106 agentSpaceId=agent_space_id,
107 filter={
108 'createdAfter': seven_days_ago
109 },
110 sortField='CREATED_AT',
111 order='DESC'
112 )
113 tasks = response.get('tasks', [])
114 print(f" Found {len(tasks)} tasks created in the last 7 days")
115 for task in tasks[:3]:
116 created = datetime.fromisoformat(task['createdAt'].replace('Z', '+00:00'))
117 print(f" β’ {task['title'][:35]}... (Created: {created.strftime('%Y-%m-%d %H:%M')})")
118
119 except DevOpsAgentError as e:
120 print(f" β Date filter error: {e}")
121
122
123def demonstrate_pagination(client, agent_space_id):
124 """Demonstrate automatic pagination using boto3 paginators."""
125 print("\nπ Pagination Examples")
126 print("-" * 25)
127
128 # Manual pagination
129 print("\n1. Manual Pagination")
130 try:
131 page_size = 3
132 next_token = None
133 total_tasks = 0
134
135 while True:
136 response = client.list_tasks(
137 agentSpaceId=agent_space_id,
138 limit=page_size,
139 nextToken=next_token
140 )
141
142 tasks = response.get('tasks', [])
143 print(f" Page with {len(tasks)} tasks (total so far: {total_tasks + len(tasks)})")
144
145 if tasks:
146 for task in tasks:
147 print(f" β’ {task['title'][:30]}...")
148
149 total_tasks += len(tasks)
150 next_token = response.get('next_token')
151
152 if not next_token or total_tasks >= 9: # Limit for demo
153 break
154
155 print(f" Total tasks retrieved: {total_tasks}")
156
157 except DevOpsAgentError as e:
158 print(f" β Manual pagination error: {e}")
159
160 # Automatic pagination with boto3 paginator
161 print("\n2. Automatic Pagination (Paginator)")
162 try:
163 paginator = client.get_paginator('list_tasks')
164 page_iterator = paginator.paginate(
165 agentSpaceId=agent_space_id,
166 PaginationConfig={
167 'MaxItems': 10, # Total items to retrieve
168 'PageSize': 2 # Items per page
169 }
170 )
171
172 total_retrieved = 0
173 page_count = 0
174
175 for page in page_iterator:
176 page_count += 1
177 tasks = page.get('tasks', [])
178 total_retrieved += len(tasks)
179 print(f" Page {page_count}: {len(tasks)} tasks")
180
181 print(f" Total tasks retrieved with paginator: {total_retrieved}")
182
183 except DevOpsAgentError as e:
184 print(f" β Paginator error: {e}")
185
186
187def demonstrate_sorting(client, agent_space_id):
188 """Demonstrate different sorting options."""
189 print("\nπ Sorting Examples")
190 print("-" * 20)
191
192 sort_options = [
193 ('CREATED_AT', 'DESC', 'Newest First'),
194 ('CREATED_AT', 'ASC', 'Oldest First'),
195 ('UPDATED_AT', 'DESC', 'Recently Updated'),
196 ('PRIORITY', 'DESC', 'Highest Priority First'),
197 ]
198
199 for sort_field, order, description in sort_options:
200 try:
201 response = client.list_tasks(
202 agentSpaceId=agent_space_id,
203 sortField=sort_field,
204 order=order,
205 limit=3
206 )
207
208 tasks = response.get('tasks', [])
209 print(f"\n{sort_field} {order} ({description}):")
210 for i, task in enumerate(tasks, 1):
211 if sort_field == 'PRIORITY':
212 key_value = task['priority']
213 elif sort_field == 'CREATED_AT':
214 key_value = task['createdAt'][:10] # Date part only
215 elif sort_field == 'UPDATED_AT':
216 key_value = task['updatedAt'][:10]
217 else:
218 key_value = "N/A"
219
220 print(f" {i}. [{key_value}] {task['title'][:30]}...")
221
222 except DevOpsAgentError as e:
223 print(f" β Sorting error for {sort_field} {order}: {e}")
224
225
226def demonstrate_error_handling(client, agent_space_id):
227 """Demonstrate error handling patterns."""
228 print("\nπ¨ Error Handling Examples")
229 print("-" * 28)
230
231 # Invalid agent space ID
232 print("\n1. Invalid Agent Space ID")
233 try:
234 response = client.list_tasks(agentSpaceId="invalid-uuid")
235 print(" β Should have failed with invalid UUID")
236 except DevOpsAgentError as e:
237 print(f" β
Correctly caught error: {type(e).__name__}")
238 print(f" Message: {str(e)[:60]}...")
239
240 # Invalid filter values
241 print("\n2. Invalid Filter Values")
242 try:
243 response = client.list_tasks(
244 agentSpaceId=agent_space_id,
245 filter={
246 'status': ['INVALID_STATUS']
247 }
248 )
249 print(" β Should have failed with invalid status")
250 except DevOpsAgentError as e:
251 print(f" β
Correctly caught validation error: {type(e).__name__}")
252
253 # Network/permission errors would be caught here in real usage
254 print("\n3. Network/Permission Error Simulation")
255 print(" (In real usage, these would be caught automatically)")
256 print(" β’ ThrottlingException: Rate limiting")
257 print(" β’ UnauthorizedException: Invalid credentials")
258 print(" β’ ResourceNotFoundException: Agent space not found")
259
260
261def performance_comparison(client, agent_space_id):
262 """Compare performance of different querying approaches."""
263 print("\nβ‘ Performance Comparison")
264 print("-" * 27)
265
266 import time
267
268 # Get all tasks first
269 try:
270 start_time = time.time()
271 all_response = client.list_tasks(agentSpaceId=agent_space_id)
272 all_tasks = all_response.get('tasks', [])
273 fetch_time = time.time() - start_time
274
275 print(f" Fetch all tasks: {fetch_time:.2f}s ({len(all_tasks)} tasks)")
276
277 # Simulate client-side filtering
278 start_time = time.time()
279 completed_tasks = [t for t in all_tasks if t['status'] == 'COMPLETED']
280 client_filter_time = time.time() - start_time
281
282 print(f" Client-side filter: {client_filter_time:.2f}s ({len(completed_tasks)} tasks)")
283
284 # Server-side filtering
285 start_time = time.time()
286 server_response = client.list_tasks(
287 agentSpaceId=agent_space_id,
288 filter={'status': ['COMPLETED']}
289 )
290 server_tasks = server_response.get('tasks', [])
291 server_filter_time = time.time() - start_time
292
293 print(f" Server-side filter: {server_filter_time:.2f}s ({len(server_tasks)} tasks)")
294 print(f" Results match: {len(completed_tasks) == len(server_tasks)}")
295
296 except DevOpsAgentError as e:
297 print(f" β Performance test error: {e}")
298
299
300def main():
301 """Main demonstration function."""
302 print("π Community DevOps Agent API - Task Listing Examples")
303 print("=" * 65)
304
305 # Get configuration
306 config = get_config()
307 config.print_configuration()
308
309 # Check service registration
310 if not devopsagent_api.is_service_registered():
311 print("β Service registration failed")
312 return
313
314 # Get agent space ID
315 agent_space_id = config.agent_space_id or get_agent_space_id()
316 print(f"Using agent space: {agent_space_id[:8]}...")
317
318 # Create client with direct AWS SigV4 authentication
319 try:
320 # Create a session with explicit credentials
321 session = boto3.Session(
322 aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
323 aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
324 aws_session_token=os.getenv('AWS_SESSION_TOKEN'),
325 region_name=config.region
326 )
327
328 # Create boto3 client with explicit session for direct AWS SigV4 authentication
329 client = session.client('community-devops-agent')
330 print("β
Client created successfully with direct AWS SigV4 authentication")
331 except Exception as e:
332 print(f"β Failed to create client: {e}")
333 return
334
335 # Run demonstrations
336 list_all_tasks(client, agent_space_id)
337 list_tasks_with_filters(client, agent_space_id)
338 demonstrate_pagination(client, agent_space_id)
339 demonstrate_sorting(client, agent_space_id)
340 demonstrate_error_handling(client, agent_space_id)
341 performance_comparison(client, agent_space_id)
342
343 print("\nπ Task listing demonstration complete!")
344 print("\nπ‘ Key Takeaways:")
345 print("β’ Use filters to reduce data transfer and improve performance")
346 print("β’ Prefer server-side filtering over client-side when possible")
347 print("β’ Use pagination for large result sets")
348 print("β’ Sort by priority for urgent task identification")
349 print("β’ Handle errors gracefully in production code")
350
351 print("\nπ Next: Run 'python examples/get_task.py' to learn about individual task operations")
352
353
354if __name__ == "__main__":
355 main()
Get Taskο
1#!/usr/bin/env python3
2"""
3Individual task operations examples for the Community DevOps Agent API.
4
5This script demonstrates how to retrieve, monitor, and update individual tasks
6using the boto3 integration, including waiters and status monitoring.
7"""
8
9import os
10import sys
11import time
12import boto3
13import devopsagent_api
14from devopsagent_api.models import TaskStatus, TaskPriority
15from devopsagent_api.exceptions import DevOpsAgentError
16from datetime import datetime
17from config import get_config
18
19
20def get_agent_space_id():
21 """Get agent space ID from environment or prompt user."""
22 agent_space_id = os.getenv('AGENT_SPACE_ID')
23 if not agent_space_id:
24 print("β AGENT_SPACE_ID environment variable not set")
25 print(" Please set it with: export AGENT_SPACE_ID=your-agent-space-uuid")
26 print(" Or run: AGENT_SPACE_ID=your-uuid python examples/get_task.py")
27 sys.exit(1)
28 return agent_space_id
29
30
31def get_task_by_id(client, agent_space_id, task_id, config):
32 """Retrieve a specific task by ID."""
33 print(f"\nπ Getting Task: {task_id[:8]}...")
34 print("-" * 40)
35
36 try:
37 response = client.get_task(
38 agentSpaceId=agent_space_id,
39 taskId=task_id
40 )
41
42 task = response.get('task', {})
43 if task:
44 print("β
Task retrieved successfully")
45 print(f" Title: {task['title']}")
46 print(f" Status: {task['status']}")
47 print(f" Priority: {task['priority']}")
48 print(f" Type: {task['taskType']}")
49 print(f" Created: {task['createdAt'][:19].replace('T', ' ')}")
50 print(f" Updated: {task['updatedAt'][:19].replace('T', ' ')}")
51
52 # Show description (truncated)
53 desc = task.get('description', '')
54 if desc:
55 print(f" Description: {desc[:100]}{'...' if len(desc) > 100 else ''}")
56
57 # Show ALL available fields
58 print("\nπ Complete Task Data Fields:")
59 print("-" * 40)
60 for key, value in task.items():
61 if key == 'description' and len(str(value)) > 100:
62 print(f" {key}: {str(value)[:100]}...")
63 else:
64 print(f" {key}: {value}")
65
66 return task
67 else:
68 print("β Task not found or empty response")
69 return None
70
71 except DevOpsAgentError as e:
72 print(f"β Failed to get task: {e}")
73 return None
74
75
76def monitor_task_status(client, agent_space_id, task_id, config, max_checks=None):
77 """Monitor a task's status changes over time."""
78 print(f"\nπ Monitoring Task Status: {task_id[:8]}...")
79 print("-" * 45)
80
81 if max_checks is None:
82 max_checks = config.max_checks
83
84 previous_status = None
85
86 for check in range(max_checks):
87 try:
88 response = client.get_task(
89 agentSpaceId=agent_space_id,
90 taskId=task_id
91 )
92
93 task = response.get('task', {})
94 if task:
95 current_status = task['status']
96 updated_at = task['updatedAt'][:19].replace('T', ' ')
97
98 if current_status != previous_status:
99 print(f" Check {check + 1}: Status = {current_status} (at {updated_at})")
100 previous_status = current_status
101
102 # Check if task is in a final state
103 if current_status in ['COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT']:
104 print(f" β
Task reached final status: {current_status}")
105 return task
106 else:
107 print(f" Check {check + 1}: Status unchanged = {current_status}")
108
109 else:
110 print(f" Check {check + 1}: Task not found")
111 return None
112
113 except DevOpsAgentError as e:
114 print(f" Check {check + 1}: Error - {e}")
115 if "ResourceNotFoundException" in str(e):
116 return None
117
118 # Wait before next check (except on last iteration)
119 if check < max_checks - 1:
120 print(" Waiting 2 seconds...")
121 time.sleep(2)
122
123 print(f" β° Monitoring complete after {max_checks} checks")
124 return task if 'task' in locals() else None
125
126
127def update_task_status(client, agent_space_id, task_id, new_status, reason="Updated via API example"):
128 """Update a task's status."""
129 print(f"\nπ Updating Task Status: {task_id[:8]}...")
130 print("-" * 42)
131
132 # Validate status
133 valid_statuses = [s.value for s in TaskStatus]
134 if new_status not in valid_statuses:
135 print(f"β Invalid status '{new_status}'. Valid options: {', '.join(valid_statuses)}")
136 return None
137
138 try:
139 response = client.update_task(
140 agentSpaceId=agent_space_id,
141 taskId=task_id,
142 taskStatus=new_status
143 )
144
145 updated_task = response.get('task', {})
146 if updated_task:
147 print("β
Task status updated successfully")
148 print(f" New Status: {updated_task['status']}")
149 print(f" Updated At: {updated_task['updatedAt'][:19].replace('T', ' ')}")
150 return updated_task
151 else:
152 print("β Update failed - no task data returned")
153 return None
154
155 except DevOpsAgentError as e:
156 print(f"β Failed to update task: {e}")
157 return None
158
159
160def initiate_mitigation_plan(client, agent_space_id, task_id, current_version, reason="HUMAN_APPROVE_MITIGATION"):
161 """
162 Initiate a mitigation plan for a task.
163
164 This function updates the task status to PENDING_START and sets the appropriate
165 metadata to trigger mitigation plan initiation, matching the behavior of the
166 provided cURL command.
167
168 Args:
169 client: boto3 client for community-devops-agent
170 agent_space_id: Agent space UUID
171 task_id: Task UUID
172 current_version: Current version of the task for optimistic concurrency
173 reason: Reason for mitigation initiation (default: HUMAN_APPROVE_MITIGATION)
174
175 Returns:
176 Updated task dict or None if failed
177 """
178 print(f"\nπ‘οΈ Initiating Mitigation Plan: {task_id[:8]}...")
179 print("-" * 50)
180
181 try:
182 response = client.update_task(
183 agentSpaceId=agent_space_id,
184 taskId=task_id,
185 taskStatus="PENDING_START",
186 currentVersion=current_version,
187 metadata={"queueingReason": reason}
188 )
189
190 updated_task = response.get('task', {})
191 if updated_task:
192 print("β
Mitigation plan initiated successfully")
193 print(f" Task Status: {updated_task['status']}")
194 print(f" Queueing Reason: {updated_task.get('metadata', {}).get('queueingReason', 'N/A')}")
195 print(f" Updated At: {updated_task['updatedAt'][:19].replace('T', ' ')}")
196 return updated_task
197 else:
198 print("β Mitigation initiation failed - no task data returned")
199 return None
200
201 except DevOpsAgentError as e:
202 print(f"β Failed to initiate mitigation: {e}")
203 return None
204
205
206def demonstrate_waiters(client, agent_space_id, task_id):
207 """Demonstrate waiter functionality for task monitoring."""
208 print(f"\nβ³ Demonstrating Waiters for Task: {task_id[:8]}...")
209 print("-" * 50)
210
211 waiter_types = [
212 ('task_completed', 'Task Completion'),
213 ('task_failed', 'Task Failure'),
214 ('task_started', 'Task Started')
215 ]
216
217 for waiter_name, description in waiter_types:
218 print(f"\n{description} ({waiter_name}):")
219
220 try:
221 waiter = client.get_waiter(waiter_name)
222 print(f" Waiter available: {waiter_name}")
223
224 # Show waiter configuration
225 config = waiter.config
226 print(f" Max attempts: {config.get('max_attempts', 'N/A')}")
227 print(f" Delay: {config.get('delay', 'N/A')} seconds")
228
229 # Note: We don't actually wait here as it would block the demo
230 # In real usage: waiter.wait(agent_space_id=agent_space_id, task_id=task_id)
231
232 except Exception as e:
233 print(f" Waiter not available: {e}")
234
235
236def demonstrate_error_handling(client, agent_space_id):
237 """Demonstrate error handling for task operations."""
238 print("\nπ¨ Error Handling Examples")
239 print("-" * 30)
240
241 # Try to get a non-existent task
242 print("\n1. Non-existent Task ID")
243 fake_task_id = "00000000-0000-0000-0000-000000000000"
244 try:
245 response = client.get_task(
246 agentSpaceId=agent_space_id,
247 taskId=fake_task_id
248 )
249 print(" β Should have failed with ResourceNotFoundException")
250 except Exception as e:
251 print(f" β
Correctly caught error: {type(e).__name__}")
252 if "ResourceNotFoundException" in str(e) or "not found" in str(e).lower():
253 print(" β
Error indicates task not found")
254
255 # Try to update with invalid status
256 print("\n2. Invalid Status Update")
257 # First get a real task ID to attempt update
258 try:
259 list_response = client.list_tasks(agentSpaceId=agent_space_id, limit=1)
260 tasks = list_response.get('tasks', [])
261 if tasks:
262 real_task_id = tasks[0]['taskId']
263 try:
264 client.update_task(
265 agentSpaceId=agent_space_id,
266 taskId=real_task_id,
267 taskStatus="INVALID_STATUS"
268 )
269 print(" β Should have failed with validation error")
270 except DevOpsAgentError as e:
271 print(f" β
Correctly caught validation error: {type(e).__name__}")
272 else:
273 print(" β οΈ No tasks available to test with")
274 except DevOpsAgentError as e:
275 print(f" β Unexpected error during setup: {e}")
276
277 # Invalid agent space ID
278 print("\n3. Invalid Agent Space ID")
279 try:
280 response = client.get_task(
281 agentSpaceId="invalid-uuid",
282 taskId=fake_task_id
283 )
284 print(" β Should have failed with validation error")
285 except DevOpsAgentError as e:
286 print(f" β
Correctly caught error: {type(e).__name__}")
287
288
289def interactive_task_selection(client, agent_space_id):
290 """Allow user to select a task interactively for demonstration."""
291 print("\nπ― Task Selection for Demonstration")
292 print("-" * 38)
293
294 try:
295 # Get recent tasks
296 response = client.list_tasks(
297 agentSpaceId=agent_space_id,
298 limit=5,
299 sortField='CREATED_AT',
300 order='DESC'
301 )
302
303 tasks = response.get('tasks', [])
304 if not tasks:
305 print("β No tasks found in this agent space")
306 print(" Please create some tasks first, or check your agent space ID")
307 return None
308
309 print("Available tasks (recently created):")
310 for i, task in enumerate(tasks, 1):
311 created = datetime.fromisoformat(task['createdAt'].replace('Z', '+00:00'))
312 print(f" {i}. {task['title'][:40]}... ({task['status']}) - {created.strftime('%Y-%m-%d %H:%M')}")
313
314 # For demo purposes, use the first task
315 selected_task = tasks[0]
316 print(f"\nπ Using task: {selected_task['title'][:40]}... (ID: {selected_task['taskId'][:8]}...)")
317 return selected_task['taskId']
318
319 except DevOpsAgentError as e:
320 print(f"β Failed to list tasks: {e}")
321 return None
322
323
324def main():
325 """Main demonstration function."""
326 print("π Community DevOps Agent API - Individual Task Operations")
327 print("=" * 65)
328
329 # Get configuration
330 config = get_config()
331 config.print_configuration()
332
333 # Check service registration
334 if not devopsagent_api.is_service_registered():
335 print("β Service registration failed")
336 return
337
338 # Get agent space ID
339 agent_space_id = config.agent_space_id or get_agent_space_id()
340 print(f"Using agent space: {agent_space_id[:8]}...")
341
342 # Create client
343 try:
344 client = config.get_client()
345 print("β
Client created successfully")
346 except Exception as e:
347 print(f"β Failed to create client: {e}")
348 return
349
350 # Select a task for demonstration
351 task_id = interactive_task_selection(client, agent_space_id)
352 if not task_id:
353 print("\nβ Cannot continue without a task to work with")
354 return
355
356 # Demonstrate operations
357 task = get_task_by_id(client, agent_space_id, task_id, config)
358 if task:
359 # Only demonstrate status updates if task is not already completed
360 if task['status'] not in ['COMPLETED', 'FAILED', 'CANCELLED']:
361 # Note: In real usage, be careful about status updates
362 print("\nβ οΈ Note: Status updates are commented out to avoid modifying real tasks")
363 print(" Uncomment the following lines to test status updates:")
364 print(" # update_task_status(client, agent_space_id, task_id, 'IN_PROGRESS')")
365 print(" # initiate_mitigation_plan(client, agent_space_id, task_id, task['version'])")
366 else:
367 print(f"\nπ Task is already in final status: {task['status']}")
368
369 # Demonstrate monitoring (safe to run)
370 monitor_task_status(client, agent_space_id, task_id, config, max_checks=3)
371
372 # Demonstrate waiters
373 demonstrate_waiters(client, agent_space_id, task_id)
374
375 # Demonstrate error handling
376 demonstrate_error_handling(client, agent_space_id)
377
378 print("\nπ Individual task operations demonstration complete!")
379 print("\nπ‘ Key Takeaways:")
380 print("β’ Use GetTask to retrieve complete task information")
381 print("β’ Monitor task progress with periodic status checks")
382 print("β’ Use waiters for event-driven task completion waiting")
383 print("β’ Update task status to reflect progress or resolution")
384 print("β’ Handle ResourceNotFoundException for missing tasks")
385 print("β’ Validate status values before updates")
386
387 print("\nπ Next Steps:")
388 print("β’ Run 'python examples/query_journal.py' for execution tracking")
389 print("β’ Run 'python examples/topology_discovery.py' for infrastructure discovery")
390 print("β’ Check examples/README.md for more usage patterns")
391
392
393if __name__ == "__main__":
394 main()
Interactive Monitoringο
Basic interactive monitoring:
1#!/usr/bin/env python3
2"""
3Timed Interactive Agent Monitoring Demo
4
5This example demonstrates monitoring with timed intervention messages:
61. Full JSON parsing of all record types
72. Rich formatted output with all details
83. Configurable verbosity levels
94. Support for all record types (message, activity, observation, etc.)
105. Timed intervention messages at specific intervals
11
12The script shows EVERYTHING - all records, parsed JSON, metadata, etc.
13"""
14
15import devopsagent_api
16import boto3
17from botocore.exceptions import ClientError, WaiterError
18import time
19from datetime import datetime
20import os
21import json
22from typing import Dict, List, Any, Optional, Tuple
23from config import get_config
24
25
26class TimedAgentMonitor:
27 """Monitor an active agent investigation with timed intervention messages."""
28
29 def __init__(self, client, agent_space_id, user_id='interactive-user',
30 show_json=False, show_thinking=True, verbose=True):
31 self.client = client
32 self.agent_space_id = agent_space_id
33 self.user_id = user_id
34 self.seen_record_ids = set()
35 self.record_count = 0
36 self.message_count = 0
37 self.interventions_sent = set() # Track which interventions have been sent
38
39 # Display options
40 self.show_json = show_json
41 self.show_thinking = show_thinking
42 self.verbose = verbose
43
44 # Record type emojis
45 self.type_emojis = {
46 'message': 'π¬',
47 'activity': 'βοΈ',
48 'observation': 'ποΈ',
49 'symptom': 'β οΈ',
50 'finding': 'π',
51 'plan': 'π',
52 'investigation_summary': 'π'
53 }
54
55 def create_task(self, title, description):
56 """Create a new investigation task."""
57 print("\n" + "="*80)
58 print("CREATING INVESTIGATION TASK")
59 print("="*80)
60
61 try:
62 response = self.client.create_task(
63 agentSpaceId=self.agent_space_id,
64 title=title,
65 description=description,
66 taskType='INVESTIGATION',
67 priority='HIGH'
68 )
69
70 task = response['task']
71 task_id = task['taskId']
72
73 print(f"β Task created successfully!")
74 print(f" Task ID: {task_id}")
75 print(f" Title: {title}")
76 print(f" Status: {task['status']}")
77 print(f" Priority: {task['priority']}")
78
79 return task_id
80
81 except ClientError as e:
82 print(f"β Error creating task: {e}")
83 return None
84
85 def wait_for_task_start(self, task_id, timeout=60):
86 """Wait for task to start using waiter."""
87 print(f"\n{'β'*80}")
88 print("WAITING FOR AGENT TO START")
89 print(f"{'β'*80}")
90
91 try:
92 waiter = self.client.get_waiter('task_started')
93
94 print(f"β³ Waiting for task to start (timeout: {timeout}s)...")
95
96 waiter.wait(
97 agentSpaceId=self.agent_space_id,
98 taskId=task_id,
99 WaiterConfig={
100 'Delay': 2,
101 'MaxAttempts': timeout // 2
102 }
103 )
104
105 print(f"β Agent has started working on the task!")
106 return True
107
108 except WaiterError as e:
109 print(f"β Timeout waiting for task to start: {e}")
110 return False
111 except ClientError as e:
112 print(f"β Error: {e}")
113 return False
114
115 def get_execution_id_from_chat(self, task_id):
116 """Get the execution ID by sending a chat message to the task."""
117 try:
118 print(f" Getting execution ID by sending initial chat message to task {task_id}")
119
120 # Send an initial chat message to create/get the chat execution
121 # Note: executionId is not required for the first message
122 response = self.client.send_chat_message(
123 agentSpaceId=self.agent_space_id,
124 taskId=task_id,
125 userId=self.user_id
126 )
127
128 # Extract execution ID from response (should be at top level)
129 execution_id = response.get('executionId')
130 if execution_id:
131 print(f" β Got execution ID from chat response: {execution_id}")
132 return execution_id
133
134 # Fallback: check if it's in the messages
135 messages = response.get('messages', [])
136 print(f" β οΈ No executionId at top level, checking messages: {messages}")
137
138 print(f" β No execution ID found in chat response")
139 print(f" Full response: {response}")
140 return None
141
142 except ClientError as e:
143 print(f"β Error sending initial chat message: {e}")
144 return None
145
146 def send_message(self, task_id, execution_id, message, intervention_id=None):
147 """Send a chat message to the agent."""
148 try:
149 # Debug: show the request parameters
150 print(f"\nπ DEBUG - SendChatMessage Request:")
151 print(f" agentSpaceId: {self.agent_space_id}")
152 print(f" taskId: {task_id}")
153 print(f" executionId: {execution_id}")
154 print(f" content: {message}")
155 print(f" userId: {self.user_id}")
156 print()
157
158 response = self.client.send_chat_message(
159 agentSpaceId=self.agent_space_id,
160 taskId=task_id,
161 executionId=execution_id,
162 content=message,
163 userId=self.user_id
164 )
165
166 intervention_label = f" (#{intervention_id + 1})" if intervention_id is not None else ""
167 print(f"\n{'β'+'β'*78+'β'}")
168 print(f"β π¬ TIMED USER INTERVENTION{intervention_label}{' '*(58-len(intervention_label))}β")
169 print(f"β{' '*78}β")
170 print(f"β Message: {message[:64]:<64}β")
171 if len(message) > 64:
172 print(f"β {message[64:]:<64}β")
173 print(f"β Message ID: {response['messages'][0]['messageId']:<59}β")
174 print(f"β{'β'*78}β\n")
175
176 if intervention_id is not None:
177 self.interventions_sent.add(intervention_id)
178 return True
179
180 except ClientError as e:
181 print(f"\nβ Error sending message: {e}\n")
182 return False
183
184 def fetch_new_records(self, execution_id):
185 """Fetch new journal records (all types)."""
186 try:
187 response = self.client.get_journal_records(
188 agentSpaceId=self.agent_space_id,
189 executionId=execution_id,
190 order='ASC',
191 limit=100
192 )
193
194 records = response.get('records', [])
195
196 # Filter out records we've already seen
197 new_records = [
198 r for r in records
199 if r['recordId'] not in self.seen_record_ids
200 ]
201
202 # Mark as seen
203 for record in new_records:
204 self.seen_record_ids.add(record['recordId'])
205
206 return new_records
207
208 except ClientError as e:
209 # Silently handle errors during polling
210 return []
211
212 def parse_message_content(self, content_str: str) -> Optional[Dict]:
213 """Parse JSON message content."""
214 try:
215 return json.loads(content_str)
216 except (json.JSONDecodeError, TypeError):
217 return None
218
219 def extract_text_from_content(self, content_data: Dict) -> List[str]:
220 """Extract readable text from parsed message content."""
221 texts = []
222
223 if not isinstance(content_data, dict):
224 return texts
225
226 # Extract from content array
227 content_array = content_data.get('content', [])
228 if isinstance(content_array, list):
229 for item in content_array:
230 if isinstance(item, dict):
231 # Text content
232 if 'text' in item:
233 texts.append(('text', item['text']))
234
235 # Thinking content
236 if 'thinking' in item and self.show_thinking:
237 texts.append(('thinking', item['thinking']))
238
239 # Tool use
240 if 'toolUse' in item:
241 tool_use = item['toolUse']
242 tool_name = tool_use.get('name', 'unknown')
243 texts.append(('tool_use', f"Tool: {tool_name}"))
244
245 # Tool result
246 if 'toolResult' in item:
247 tool_result = item['toolResult']
248 tool_id = tool_result.get('toolUseId', 'unknown')
249 texts.append(('tool_result', f"Tool Result: {tool_id}"))
250
251 return texts
252
253 def format_timestamp(self, timestamp: str) -> str:
254 """Format timestamp for display."""
255 if not timestamp or timestamp == 'N/A':
256 return 'N/A'
257
258 try:
259 dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
260 return dt.strftime('%H:%M:%S')
261 except:
262 return timestamp
263
264 def display_record(self, record: Dict):
265 """Display a journal record with full parsing."""
266 self.record_count += 1
267
268 record_id = record.get('recordId', 'unknown')
269 record_type = record.get('recordType', 'unknown')
270 timestamp = self.format_timestamp(record.get('timestamp', 'N/A'))
271 content = record.get('content', '')
272
273 emoji = self.type_emojis.get(record_type, 'π')
274
275 # Header
276 print(f"\n{'β'+'β'*78+'β'}")
277 print(f"β {emoji} RECORD #{self.record_count} - {record_type.upper():<60} β")
278 print(f"β Time: {timestamp:<71} β")
279 print(f"β ID: {record_id[:72]:<72} β")
280 print(f"β{'β'*78}β€")
281
282 # Parse content if it's a message
283 if record_type == 'message':
284 self.message_count += 1
285 parsed = self.parse_message_content(content)
286
287 if parsed:
288 # Show role
289 role = parsed.get('role', 'unknown')
290 print(f"β Role: {role:<72} β")
291
292 # Extract and display text content
293 texts = self.extract_text_from_content(parsed)
294
295 if texts:
296 print(f"β{'β'*78}β€")
297 for content_type, text in texts:
298 # Format content type
299 type_label = content_type.replace('_', ' ').title()
300 print(f"β [{type_label}]:{' '*(70-len(type_label))} β")
301
302 # Word wrap text
303 words = text.split()
304 line = ""
305 for word in words:
306 if len(line) + len(word) + 1 <= 74:
307 line += (word + " ")
308 else:
309 print(f"β {line:<76} β")
310 line = word + " "
311 if line:
312 print(f"β {line:<76} β")
313
314 # Show raw JSON if requested
315 if self.show_json:
316 print(f"β{'β'*78}β€")
317 print(f"β [Raw JSON]:{' '*66} β")
318 json_str = json.dumps(parsed, indent=2)
319 for line in json_str.split('\n')[:10]: # Limit to 10 lines
320 print(f"β {line[:76]:<76} β")
321 if len(json_str.split('\n')) > 10:
322 print(f"β ... (truncated){' '*62} β")
323 else:
324 # Show raw content if not parseable
325 print(f"β Content: {content[:68]:<68} β")
326 if len(content) > 68:
327 print(f"β ... (truncated){' '*62} β")
328 else:
329 # Non-message records
330 print(f"β Content: {content[:68]:<68} β")
331 if len(content) > 68:
332 remaining = content[68:]
333 while remaining:
334 chunk = remaining[:76]
335 print(f"β {chunk:<76} β")
336 remaining = remaining[76:]
337
338 print(f"β{'β'*78}β")
339
340 def check_task_status(self, task_id):
341 """Check if task is completed."""
342 try:
343 response = self.client.get_task(
344 agentSpaceId=self.agent_space_id,
345 taskId=task_id
346 )
347
348 task = response['task']
349 status = task['status']
350
351 return status in ['COMPLETED', 'FAILED', 'CANCELED', 'CANCELLED']
352
353 except ClientError:
354 return False
355
356 def monitor_task(self, task_id, execution_id, timed_interventions: List[Tuple[float, str]],
357 poll_interval=2, max_duration=300):
358 """Monitor task execution with timed intervention messages."""
359 print(f"\n{'='*80}")
360 print("MONITORING AGENT ACTIVITY (TIMED INTERVENTIONS)")
361 print(f"{'='*80}")
362 print(f"π Configuration:")
363 print(f" β’ Poll interval: {poll_interval}s")
364 print(f" β’ Max duration: {max_duration}s")
365 print(f" β’ Timed interventions: {len(timed_interventions)}")
366 for i, (delay, msg) in enumerate(timed_interventions, 1):
367 print(f" {i}. After {delay}s: '{msg[:50]}{'...' if len(msg) > 50 else ''}'")
368 print(f" β’ Show JSON: {self.show_json}")
369 print(f" β’ Show thinking: {self.show_thinking}")
370 print(f" β’ Verbose: {self.verbose}")
371 print(f"{'β'*80}")
372
373 start_time = time.time()
374
375 while True:
376 # Check if we've exceeded max duration
377 elapsed = time.time() - start_time
378 if elapsed > max_duration:
379 print(f"\nβ±οΈ Max duration ({max_duration}s) reached. Stopping monitor.")
380 break
381
382 # Send timed intervention messages (only once per intervention)
383 for intervention_id, (delay_seconds, message) in enumerate(timed_interventions):
384 if (elapsed >= delay_seconds and
385 intervention_id not in self.interventions_sent):
386 self.send_message(task_id, execution_id, message, intervention_id)
387 self.interventions_sent.add(intervention_id)
388
389 # Fetch new records
390 new_records = self.fetch_new_records(execution_id)
391
392 # Display new records
393 for record in new_records:
394 self.display_record(record)
395
396 # Check if task is completed (but don't exit - continue for timed interventions)
397 task_completed = self.check_task_status(task_id)
398 if task_completed and not hasattr(self, '_task_completion_logged'):
399 print(f"\n{'β'*80}")
400 print("β
TASK COMPLETED")
401 print(f"{'β'*80}")
402 self._task_completion_logged = True
403
404 # Wait before next poll
405 time.sleep(poll_interval)
406
407 # Final summary
408 duration = time.time() - start_time
409 interventions_completed = len(self.interventions_sent)
410 print(f"\n{'='*80}")
411 print("MONITORING SUMMARY")
412 print(f"{'='*80}")
413 print(f"π Statistics:")
414 print(f" β’ Total records: {self.record_count}")
415 print(f" β’ Message records: {self.message_count}")
416 print(f" β’ Interventions sent: {interventions_completed}/{len(timed_interventions)}")
417 print(f" β’ Duration: {duration:.1f}s")
418 print(f"{'='*80}\n")
419
420
421def main():
422 """Run the timed interactive monitoring demo."""
423
424 # Get configuration
425 config = get_config()
426 config.print_configuration()
427
428 # Configuration - can be overridden with environment variables
429 agent_space_id = config.agent_space_id or os.getenv('AGENT_SPACE_ID')
430 user_id = config.user_id
431 region = config.region
432 create_new_investigation = os.getenv('CREATE_NEW_INVESTIGATION', 'false').lower() == 'true'
433
434 # Timed interventions: (delay_seconds, message)
435 timed_interventions = [
436 (2.0, "Please investigate the high memory usage issue on our production servers"),
437 (6.0, "The issue started around 3 PM today and affects multiple EC2 instances in us-east-1"),
438 (10.0, "Customers are reporting slow response times, please prioritize this investigation")
439 ]
440
441 # Monitoring configuration
442 poll_interval = 1 # Poll every 1 second
443 max_duration = 15 # Stop after 10 seconds for demo
444
445 # Display options
446 show_json = os.getenv('SHOW_JSON', 'false').lower() == 'true'
447 show_thinking = os.getenv('SHOW_THINKING', 'true').lower() == 'true'
448 verbose = os.getenv('VERBOSE', 'true').lower() == 'true'
449
450 print("\n" + "="*80)
451 print(" "*20 + "TIMED AGENT MONITORING DEMO")
452 print(" "*15 + "(Creating new investigation)")
453 print("="*80)
454 print(f"Agent Space ID: {agent_space_id}")
455 print(f"User ID: {user_id}")
456 print()
457
458 # Create boto3 client
459 client = boto3.client('community-devops-agent', region_name=region)
460
461 # Create monitor
462 monitor = TimedAgentMonitor(
463 client, agent_space_id, user_id,
464 show_json=show_json,
465 show_thinking=show_thinking,
466 verbose=verbose
467 )
468
469 if create_new_investigation:
470 print("Creating new investigation task...")
471 title = "High Memory Usage Investigation"
472 description = "Investigate high memory usage on production servers affecting customer experience"
473 task_id = monitor.create_task(title, description)
474
475 if not task_id:
476 print("Failed to create investigation task")
477 return
478
479 # Wait for task to start
480 if not monitor.wait_for_task_start(task_id):
481 print("Failed to wait for task to start")
482 return
483 else:
484 # Find existing task to use for monitoring
485 print("Finding existing task for monitoring...")
486 try:
487 response = client.list_tasks(
488 agentSpaceId=agent_space_id,
489 limit=10
490 )
491
492 tasks = response.get('tasks', [])
493 if not tasks:
494 print("No existing tasks found. Creating a new investigation task...")
495 # Fallback: create new task if no existing ones
496 title = "Timed Interactive Demo"
497 description = "Demonstration with timed intervention messages"
498 task_id = monitor.create_task(title, description)
499
500 if not task_id:
501 print("Failed to create investigation task")
502 return
503
504 # Wait for task to start
505 if not monitor.wait_for_task_start(task_id):
506 print("Failed to wait for task to start")
507 return
508 else:
509 # Use the first existing task
510 first_task = tasks[0]
511 task_id = first_task['taskId']
512 task_type = first_task.get('taskType', 'unknown')
513 status = first_task.get('status', 'unknown')
514
515 print(f"Using existing task: {task_id}")
516 print(f" Type: {task_type}")
517 print(f" Status: {status}")
518
519 except ClientError as e:
520 print(f"Error listing tasks: {e}")
521 print("Creating a new investigation task as fallback...")
522 # Fallback: create new task if listing fails
523 title = "Timed Interactive Demo"
524 description = "Demonstration with timed intervention messages"
525 task_id = monitor.create_task(title, description)
526
527 if not task_id:
528 print("Failed to create investigation task")
529 return
530
531 # Wait for task to start
532 if not monitor.wait_for_task_start(task_id):
533 print("Failed to wait for task to start")
534 return
535
536 # Get execution ID by sending initial chat message
537 execution_id = monitor.get_execution_id_from_chat(task_id)
538
539 if not execution_id:
540 print("Failed to get execution ID from chat")
541 return
542
543 # Monitor task with timed interventions
544 monitor.monitor_task(
545 task_id=task_id,
546 execution_id=execution_id,
547 timed_interventions=timed_interventions,
548 poll_interval=poll_interval,
549 max_duration=max_duration
550 )
551
552 print("β¨ Timed monitoring demo completed!")
553 print(f" Task ID: {task_id}")
554 print(f" Execution ID: {execution_id}")
555 print(f" Total records displayed: {monitor.record_count}")
556 print(f" Message records: {monitor.message_count}")
557 print(f" Interventions sent: {len(monitor.interventions_sent)}/{len(timed_interventions)}\n")
558
559
560if __name__ == '__main__':
561 main()
Enhanced interactive monitoring:
1#!/usr/bin/env python3
2"""
3Enhanced Interactive Agent Monitoring Example
4
5This example demonstrates comprehensive monitoring with:
61. Full JSON parsing of all record types
72. Rich formatted output with all details
83. Configurable verbosity levels
94. Support for all record types (message, activity, observation, etc.)
105. Interactive intervention with message injection
11
12The script shows EVERYTHING - all records, parsed JSON, metadata, etc.
13"""
14
15import devopsagent_api
16import boto3
17from botocore.exceptions import ClientError, WaiterError
18import time
19from datetime import datetime
20import os
21import json
22from typing import Dict, List, Any, Optional
23from config import get_config
24
25
26class EnhancedAgentMonitor:
27 """Monitor an active agent investigation with comprehensive output."""
28
29 def __init__(self, client, agent_space_id, user_id='interactive-user',
30 show_json=False, show_thinking=True, verbose=True):
31 self.client = client
32 self.agent_space_id = agent_space_id
33 self.user_id = user_id
34 self.seen_record_ids = set()
35 self.record_count = 0
36 self.message_count = 0
37 self.intervention_sent = False
38
39 # Display options
40 self.show_json = show_json
41 self.show_thinking = show_thinking
42 self.verbose = verbose
43
44 # Record type emojis
45 self.type_emojis = {
46 'message': 'π¬',
47 'activity': 'βοΈ',
48 'observation': 'ποΈ',
49 'symptom': 'β οΈ',
50 'finding': 'π',
51 'plan': 'π',
52 'investigation_summary': 'π'
53 }
54
55 def create_task(self, title, description):
56 """Create a new investigation task."""
57 print("\n" + "="*80)
58 print("CREATING INVESTIGATION TASK")
59 print("="*80)
60
61 try:
62 response = self.client.create_task(
63 agentSpaceId=self.agent_space_id,
64 title=title,
65 description=description,
66 taskType='INVESTIGATION',
67 priority='HIGH'
68 )
69
70 task = response['task']
71 task_id = task['taskId']
72
73 print(f"β Task created successfully!")
74 print(f" Task ID: {task_id}")
75 print(f" Title: {title}")
76 print(f" Status: {task['status']}")
77 print(f" Priority: {task['priority']}")
78
79 return task_id
80
81 except ClientError as e:
82 print(f"β Error creating task: {e}")
83 return None
84
85 def wait_for_task_start(self, task_id, timeout=60):
86 """Wait for task to start using waiter."""
87 print(f"\n{'β'*80}")
88 print("WAITING FOR AGENT TO START")
89 print(f"{'β'*80}")
90
91 try:
92 waiter = self.client.get_waiter('task_started')
93
94 print(f"β³ Waiting for task to start (timeout: {timeout}s)...")
95
96 waiter.wait(
97 agentSpaceId=self.agent_space_id,
98 taskId=task_id,
99 WaiterConfig={
100 'Delay': 2,
101 'MaxAttempts': timeout // 2
102 }
103 )
104
105 print(f"β Agent has started working on the task!")
106 return True
107
108 except WaiterError as e:
109 print(f"β Timeout waiting for task to start: {e}")
110 return False
111 except ClientError as e:
112 print(f"β Error: {e}")
113 return False
114
115 def get_execution_id(self, task_id):
116 """Get the execution ID for the task."""
117 try:
118 response = self.client.list_executions(
119 agentSpaceId=self.agent_space_id,
120 taskId=task_id,
121 limit=1
122 )
123
124 executions = response.get('executions', [])
125 if executions:
126 execution_id = executions[0]['executionId']
127 print(f" Execution ID: {execution_id}")
128 return execution_id
129
130 return None
131
132 except ClientError as e:
133 print(f"β Error getting execution: {e}")
134 return None
135
136 def send_message(self, task_id, execution_id, message):
137 """Send a chat message to the agent."""
138 try:
139 response = self.client.send_chat_message(
140 agentSpaceId=self.agent_space_id,
141 taskId=task_id,
142 executionId=execution_id,
143 content=message,
144 userId=self.user_id
145 )
146
147 print(f"\n{'β'+'β'*78+'β'}")
148 print(f"β π¬ USER INTERVENTION{' '*58}β")
149 print(f"β{' '*78}β")
150 print(f"β Message: {message[:64]:<64}β")
151 if len(message) > 64:
152 print(f"β {message[64:]:<64}β")
153 print(f"β Message ID: {response['messages'][0]['messageId']:<59}β")
154 print(f"β{'β'*78}β\n")
155
156 self.intervention_sent = True
157 return True
158
159 except ClientError as e:
160 print(f"\nβ Error sending message: {e}\n")
161 return False
162
163 def fetch_new_records(self, execution_id):
164 """Fetch new journal records (all types)."""
165 try:
166 response = self.client.get_journal_records(
167 agentSpaceId=self.agent_space_id,
168 executionId=execution_id,
169 order='ASC',
170 limit=100
171 )
172
173 records = response.get('records', [])
174
175 # Filter out records we've already seen
176 new_records = [
177 r for r in records
178 if r['recordId'] not in self.seen_record_ids
179 ]
180
181 # Mark as seen
182 for record in new_records:
183 self.seen_record_ids.add(record['recordId'])
184
185 return new_records
186
187 except ClientError as e:
188 # Silently handle errors during polling
189 return []
190
191 def parse_message_content(self, content_str: str) -> Optional[Dict]:
192 """Parse JSON message content."""
193 try:
194 return json.loads(content_str)
195 except (json.JSONDecodeError, TypeError):
196 return None
197
198 def extract_text_from_content(self, content_data: Dict) -> List[str]:
199 """Extract readable text from parsed message content."""
200 texts = []
201
202 if not isinstance(content_data, dict):
203 return texts
204
205 # Extract from content array
206 content_array = content_data.get('content', [])
207 if isinstance(content_array, list):
208 for item in content_array:
209 if isinstance(item, dict):
210 # Text content
211 if 'text' in item:
212 texts.append(('text', item['text']))
213
214 # Thinking content
215 if 'thinking' in item and self.show_thinking:
216 texts.append(('thinking', item['thinking']))
217
218 # Tool use
219 if 'toolUse' in item:
220 tool_use = item['toolUse']
221 tool_name = tool_use.get('name', 'unknown')
222 texts.append(('tool_use', f"Tool: {tool_name}"))
223
224 # Tool result
225 if 'toolResult' in item:
226 tool_result = item['toolResult']
227 tool_id = tool_result.get('toolUseId', 'unknown')
228 texts.append(('tool_result', f"Tool Result: {tool_id}"))
229
230 return texts
231
232 def format_timestamp(self, timestamp: str) -> str:
233 """Format timestamp for display."""
234 if not timestamp or timestamp == 'N/A':
235 return 'N/A'
236
237 try:
238 dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
239 return dt.strftime('%H:%M:%S')
240 except:
241 return timestamp
242
243 def display_record(self, record: Dict):
244 """Display a journal record with full parsing."""
245 self.record_count += 1
246
247 record_id = record.get('recordId', 'unknown')
248 record_type = record.get('recordType', 'unknown')
249 timestamp = self.format_timestamp(record.get('timestamp', 'N/A'))
250 content = record.get('content', '')
251
252 emoji = self.type_emojis.get(record_type, 'π')
253
254 # Header
255 print(f"\n{'β'+'β'*78+'β'}")
256 print(f"β {emoji} RECORD #{self.record_count} - {record_type.upper():<60} β")
257 print(f"β Time: {timestamp:<71} β")
258 print(f"β ID: {record_id[:72]:<72} β")
259 print(f"β{'β'*78}β€")
260
261 # Parse content if it's a message
262 if record_type == 'message':
263 self.message_count += 1
264 parsed = self.parse_message_content(content)
265
266 if parsed:
267 # Show role
268 role = parsed.get('role', 'unknown')
269 print(f"β Role: {role:<72} β")
270
271 # Extract and display text content
272 texts = self.extract_text_from_content(parsed)
273
274 if texts:
275 print(f"β{'β'*78}β€")
276 for content_type, text in texts:
277 # Format content type
278 type_label = content_type.replace('_', ' ').title()
279 print(f"β [{type_label}]:{' '*(70-len(type_label))} β")
280
281 # Word wrap text
282 words = text.split()
283 line = ""
284 for word in words:
285 if len(line) + len(word) + 1 <= 74:
286 line += (word + " ")
287 else:
288 print(f"β {line:<76} β")
289 line = word + " "
290 if line:
291 print(f"β {line:<76} β")
292
293 # Show raw JSON if requested
294 if self.show_json:
295 print(f"β{'β'*78}β€")
296 print(f"β [Raw JSON]:{' '*66} β")
297 json_str = json.dumps(parsed, indent=2)
298 for line in json_str.split('\n')[:10]: # Limit to 10 lines
299 print(f"β {line[:76]:<76} β")
300 if len(json_str.split('\n')) > 10:
301 print(f"β ... (truncated){' '*62} β")
302 else:
303 # Show raw content if not parseable
304 print(f"β Content: {content[:68]:<68} β")
305 if len(content) > 68:
306 print(f"β ... (truncated){' '*62} β")
307 else:
308 # Non-message records
309 print(f"β Content: {content[:68]:<68} β")
310 if len(content) > 68:
311 remaining = content[68:]
312 while remaining:
313 chunk = remaining[:76]
314 print(f"β {chunk:<76} β")
315 remaining = remaining[76:]
316
317 print(f"β{'β'*78}β")
318
319 def check_task_status(self, task_id):
320 """Check if task is completed."""
321 try:
322 response = self.client.get_task(
323 agentSpaceId=self.agent_space_id,
324 taskId=task_id
325 )
326
327 task = response['task']
328 status = task['status']
329
330 return status in ['COMPLETED', 'FAILED', 'CANCELED', 'CANCELLED']
331
332 except ClientError:
333 return False
334
335 def monitor_task(self, task_id, execution_id, intervention_message,
336 intervention_after=5, poll_interval=2, max_duration=300):
337 """Monitor task execution with comprehensive output."""
338 print(f"\n{'='*80}")
339 print("MONITORING AGENT ACTIVITY (ENHANCED MODE)")
340 print(f"{'='*80}")
341 print(f"π Configuration:")
342 print(f" β’ Poll interval: {poll_interval}s")
343 print(f" β’ Intervention after: {intervention_after} messages")
344 print(f" β’ Max duration: {max_duration}s")
345 print(f" β’ Show JSON: {self.show_json}")
346 print(f" β’ Show thinking: {self.show_thinking}")
347 print(f" β’ Verbose: {self.verbose}")
348 print(f"{'β'*80}")
349
350 start_time = time.time()
351
352 while True:
353 # Check if we've exceeded max duration
354 elapsed = time.time() - start_time
355 if elapsed > max_duration:
356 print(f"\nβ±οΈ Max duration ({max_duration}s) reached. Stopping monitor.")
357 break
358
359 # Fetch new records
360 new_records = self.fetch_new_records(execution_id)
361
362 # Display new records
363 for record in new_records:
364 self.display_record(record)
365
366 # Send intervention message after N messages
367 if (self.message_count >= intervention_after and
368 not self.intervention_sent):
369 self.send_message(task_id, execution_id, intervention_message)
370
371 # Check if task is completed
372 if self.check_task_status(task_id):
373 print(f"\n{'β'*80}")
374 print("β
TASK COMPLETED")
375 print(f"{'β'*80}")
376 break
377
378 # Wait before next poll
379 time.sleep(poll_interval)
380
381 # Final summary
382 duration = time.time() - start_time
383 print(f"\n{'='*80}")
384 print("MONITORING SUMMARY")
385 print(f"{'='*80}")
386 print(f"π Statistics:")
387 print(f" β’ Total records: {self.record_count}")
388 print(f" β’ Message records: {self.message_count}")
389 print(f" β’ Intervention sent: {'Yes' if self.intervention_sent else 'No'}")
390 print(f" β’ Duration: {duration:.1f}s")
391 print(f"{'='*80}\n")
392
393
394def main():
395 """Run the enhanced interactive monitoring example."""
396
397 # Get configuration
398 config = get_config()
399 config.print_configuration()
400
401 # Configuration
402 agent_space_id = config.agent_space_id or os.getenv('AGENT_SPACE_ID')
403 user_id = config.user_id
404
405 # Task configuration
406 task_title = 'Investigate high memory usage on production servers'
407 task_description = '''
408 Production servers are showing consistently high memory usage (>85%)
409 over the past 2 hours. Please investigate:
410 1. Which processes are consuming the most memory
411 2. Any memory leaks or unusual patterns
412 3. Recommendations for remediation
413 '''
414
415 # Monitoring configuration
416 intervention_message = "Can you also check if there are any disk I/O bottlenecks?"
417 intervention_after = 5 # Send message after 5 agent messages
418 poll_interval = 2 # Poll every 2 seconds
419 max_duration = 300 # Stop after 5 minutes
420
421 # Display options
422 show_json = os.getenv('SHOW_JSON', 'false').lower() == 'true'
423 show_thinking = os.getenv('SHOW_THINKING', 'true').lower() == 'true'
424 verbose = os.getenv('VERBOSE', 'true').lower() == 'true'
425
426 print("\n" + "="*80)
427 print(" "*20 + "ENHANCED AGENT MONITORING")
428 print("="*80)
429
430 # Create boto3 client
431 client = config.get_client()
432
433 # Create monitor
434 monitor = EnhancedAgentMonitor(
435 client, agent_space_id, user_id,
436 show_json=show_json,
437 show_thinking=show_thinking,
438 verbose=verbose
439 )
440
441 # Step 1: Create task
442 task_id = monitor.create_task(task_title, task_description)
443 if not task_id:
444 print("\nβ Failed to create task. Exiting.")
445 return
446
447 # Step 2: Wait for task to start
448 if not monitor.wait_for_task_start(task_id, timeout=60):
449 print("\nβ οΈ Task did not start in time. You may need to check the task manually.")
450 return
451
452 # Step 3: Get execution ID
453 execution_id = monitor.get_execution_id(task_id)
454 if not execution_id:
455 print("\nβ Could not get execution ID. Exiting.")
456 return
457
458 # Step 4: Monitor task with intervention
459 monitor.monitor_task(
460 task_id=task_id,
461 execution_id=execution_id,
462 intervention_message=intervention_message,
463 intervention_after=intervention_after,
464 poll_interval=poll_interval,
465 max_duration=max_duration
466 )
467
468 print("β¨ Enhanced monitoring completed!")
469 print(f" Task ID: {task_id}")
470 print(f" Total records displayed: {monitor.record_count}")
471 print(f" Message records: {monitor.message_count}\n")
472
473
474if __name__ == '__main__':
475 main()
Knowledge Base Managementο
1#!/usr/bin/env python3
2"""
3Community DevOps Agent - Knowledgebase Management Example
4
5This example demonstrates comprehensive knowledgebase management capabilities
6for the Community DevOps Agent API, including:
7
8- Creating knowledge items (runbooks, documentation, procedures, guides)
9- Listing and searching knowledge items with filtering
10- Retrieving specific knowledge items
11- Updating knowledge content and metadata
12- Managing lifecycle status (DRAFT β REVIEW β PUBLISHED β ARCHIVED)
13- Deleting knowledge items
14- Interactive management interface
15
16Knowledge items support multiple content types and can store operational
17procedures, troubleshooting guides, best practices, and reference materials.
18
19Usage:
20 python examples/knowledgebase_management.py
21
22Environment Variables:
23 AGENT_SPACE_ID - Your DevOps Agent space UUID (required)
24 AWS_REGION - AWS region (default: us-east-1)
25"""
26
27import os
28import sys
29import json
30import uuid
31from datetime import datetime
32from typing import Dict, List, Optional, Any
33from dataclasses import dataclass
34
35# Add the project root to Python path
36sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
37
38import devopsagent_api
39import boto3
40from botocore.exceptions import ClientError
41
42
43@dataclass
44class KnowledgeItem:
45 """Knowledge item data structure"""
46 knowledge_item_id: str
47 agent_space_id: str
48 title: str
49 content: Dict[str, str]
50 knowledge_item_type: str
51 life_cycle_status: str
52 properties: Optional[Dict[str, str]] = None
53 source_references: Optional[List[str]] = None
54 created_at: Optional[str] = None
55 updated_at: Optional[str] = None
56 version: Optional[int] = None
57
58
59class KnowledgebaseManager:
60 """Knowledgebase management client for DevOps Agent"""
61
62 def __init__(self, agent_space_id: str, region_name: str = None):
63 """Initialize the knowledgebase manager"""
64 if region_name is None:
65 from config import get_config
66 config = get_config()
67 region_name = config.region
68 self.agent_space_id = agent_space_id
69 self.region_name = region_name
70 self.client = boto3.client(
71 'community-devops-agent',
72 region_name=region_name
73 )
74 print(f"π€ Knowledgebase Manager initialized for agent space: {agent_space_id}")
75
76 def create_knowledge_item(
77 self,
78 title: str,
79 content: str,
80 content_type: str = "text/markdown",
81 knowledge_item_type: str = "RUNBOOK",
82 life_cycle_status: str = "ACTIVE",
83 properties: Optional[Dict[str, str]] = None,
84 source_references: Optional[List[str]] = None
85 ) -> KnowledgeItem:
86 """Create a new knowledge item"""
87 try:
88 # Build request parameters dynamically to avoid sending None values
89 params = {
90 'agentSpaceId': self.agent_space_id,
91 'title': title,
92 'content': {
93 'type': content_type,
94 'value': content
95 },
96 'knowledgeItemType': knowledge_item_type
97 }
98
99 # Only add optional parameters if they have values
100 if life_cycle_status:
101 params['lifeCycleStatus'] = life_cycle_status
102 if properties is not None:
103 params['properties'] = properties
104 if source_references:
105 params['sourceReferences'] = source_references
106
107 response = self.client.create_knowledge_item(**params)
108
109 # The API returns 201 Created but may not return the full item data
110 # For now, we'll create a minimal KnowledgeItem with the data we sent
111 return KnowledgeItem(
112 knowledge_item_id="created", # We don't get the ID back
113 agent_space_id=self.agent_space_id,
114 title=title,
115 content={
116 'type': content_type,
117 'value': content
118 },
119 knowledge_item_type=knowledge_item_type,
120 life_cycle_status=life_cycle_status or "ACTIVE",
121 properties=properties,
122 source_references=source_references,
123 created_at=None, # We don't get timestamps back
124 updated_at=None,
125 version=None
126 )
127
128 except ClientError as e:
129 print(f"β Failed to create knowledge item: {e}")
130 raise
131
132 def get_knowledge_item(self, knowledge_item_id: str) -> KnowledgeItem:
133 """Retrieve a specific knowledge item"""
134 try:
135 response = self.client.get_knowledge_item(
136 agentSpaceId=self.agent_space_id,
137 knowledgeItemId=knowledge_item_id
138 )
139
140 # The API returns success but may not include the full item data
141 # Check if response contains expected data, if not, operation still succeeded
142 try:
143 item_data = response['knowledgeItem']
144 return KnowledgeItem(**item_data)
145 except KeyError:
146 # API succeeded but didn't return item data - this is expected
147 # Return a minimal KnowledgeItem to indicate success
148 return KnowledgeItem(
149 knowledge_item_id=knowledge_item_id,
150 agent_space_id=self.agent_space_id,
151 title="Retrieved Item", # Placeholder
152 content={'type': 'text/plain', 'value': 'Item retrieved successfully'},
153 knowledge_item_type="RUNBOOK",
154 life_cycle_status="ACTIVE"
155 )
156
157 except ClientError as e:
158 print(f"β Failed to get knowledge item: {e}")
159 raise
160
161 def update_knowledge_item(
162 self,
163 knowledge_item_id: str,
164 title: Optional[str] = None,
165 content: Optional[str] = None,
166 content_type: Optional[str] = None,
167 knowledge_item_type: Optional[str] = None,
168 life_cycle_status: Optional[str] = None,
169 properties: Optional[Dict[str, str]] = None,
170 source_references: Optional[List[str]] = None
171 ) -> KnowledgeItem:
172 """Update an existing knowledge item"""
173 try:
174 update_params = {
175 'agentSpaceId': self.agent_space_id,
176 'knowledgeItemId': knowledge_item_id
177 }
178
179 if title is not None:
180 update_params['title'] = title
181
182 if content is not None or content_type is not None:
183 current_item = self.get_knowledge_item(knowledge_item_id)
184 update_params['content'] = {
185 'type': content_type or current_item.content['type'],
186 'value': content if content is not None else current_item.content['value']
187 }
188
189 if knowledge_item_type is not None:
190 update_params['knowledgeItemType'] = knowledge_item_type
191
192 if life_cycle_status is not None:
193 update_params['lifeCycleStatus'] = life_cycle_status
194
195 if properties is not None:
196 update_params['properties'] = properties
197
198 if source_references is not None:
199 update_params['sourceReferences'] = source_references
200
201 response = self.client.update_knowledge_item(**update_params)
202
203 # The API returns success but may not include the full item data
204 # Check if response contains expected data, if not, operation still succeeded
205 try:
206 item_data = response['knowledgeItem']
207 return KnowledgeItem(**item_data)
208 except KeyError:
209 # API succeeded but didn't return item data - this is expected
210 # Return a minimal KnowledgeItem to indicate success
211 return KnowledgeItem(
212 knowledge_item_id=knowledge_item_id,
213 agent_space_id=self.agent_space_id,
214 title="Updated Item", # Placeholder
215 content={'type': 'text/plain', 'value': 'Item updated successfully'},
216 knowledge_item_type="RUNBOOK",
217 life_cycle_status="ACTIVE",
218 properties=properties
219 )
220
221 except ClientError as e:
222 print(f"β Failed to update knowledge item: {e}")
223 raise
224
225 def delete_knowledge_item(self, knowledge_item_id: str) -> bool:
226 """Delete a knowledge item"""
227 try:
228 response = self.client.delete_knowledge_item(
229 agentSpaceId=self.agent_space_id,
230 knowledgeItemId=knowledge_item_id
231 )
232
233 # The API returns success but may not include a 'success' field
234 # Check if response contains expected data, if not, operation still succeeded
235 try:
236 return response.get('success', True) # Default to True if field exists but is missing
237 except KeyError:
238 # API succeeded but didn't return success field - this is expected
239 # Since we got here without a ClientError, the operation succeeded
240 return True
241
242 except ClientError as e:
243 print(f"β Failed to delete knowledge item: {e}")
244 raise
245
246 def list_knowledge_items(
247 self,
248 limit: int = 50,
249 next_token: Optional[str] = None,
250 knowledge_item_type: Optional[str] = None,
251 life_cycle_status: Optional[str] = None,
252 last_modified_after: Optional[str] = None,
253 last_modified_before: Optional[str] = None
254 ) -> Dict[str, Any]:
255 """List knowledge items with optional filtering"""
256 try:
257 params = {
258 'agentSpaceId': self.agent_space_id,
259 'limit': limit
260 }
261
262 if next_token:
263 params['nextToken'] = next_token
264 if knowledge_item_type:
265 params['knowledgeItemType'] = knowledge_item_type
266 if life_cycle_status:
267 params['lifeCycleStatus'] = life_cycle_status
268 if last_modified_after:
269 params['lastModifiedAfter'] = last_modified_after
270 if last_modified_before:
271 params['lastModifiedBefore'] = last_modified_before
272
273 response = self.client.list_knowledge_items(**params)
274
275 # Convert response items to KnowledgeItem objects
276 knowledge_items = []
277 for item_data in response['knowledgeItems']:
278 # Handle API response field name differences
279 processed_data = {}
280 for key, value in item_data.items():
281 # Convert camelCase to snake_case for dataclass
282 if key == 'knowledgeItemId':
283 processed_data['knowledge_item_id'] = value
284 elif key == 'agentSpaceId':
285 processed_data['agent_space_id'] = value
286 elif key == 'knowledgeItemType':
287 processed_data['knowledge_item_type'] = value
288 elif key == 'lifeCycleStatus':
289 processed_data['life_cycle_status'] = value
290 elif key == 'sourceReferences':
291 processed_data['source_references'] = value
292 elif key == 'createdAt':
293 processed_data['created_at'] = value
294 elif key == 'updatedAt':
295 processed_data['updated_at'] = value
296 else:
297 # Keep other fields as-is (they might already be snake_case)
298 processed_data[key] = value
299
300 # Extract title and description from properties.runbook if available
301 if 'properties' in processed_data and isinstance(processed_data['properties'], dict):
302 runbook_data = processed_data['properties'].get('runbook', {})
303 if isinstance(runbook_data, dict):
304 if 'title' not in processed_data and 'title' in runbook_data:
305 processed_data['title'] = runbook_data['title']
306 if 'description' in runbook_data:
307 # Store description in content for now
308 processed_data['content'] = {
309 'type': 'text/plain',
310 'value': runbook_data['description']
311 }
312
313 # Set defaults for missing required fields
314 processed_data.setdefault('agent_space_id', self.agent_space_id) # Use the one we queried with
315 processed_data.setdefault('title', f"Knowledge Item {processed_data['knowledge_item_id'][:8]}...")
316 processed_data.setdefault('content', {'type': 'text/plain', 'value': 'Content not available in list view'})
317 processed_data.setdefault('updated_at', None)
318
319 # Ensure all required fields are present
320 required_fields = ['knowledge_item_id', 'agent_space_id', 'title', 'content', 'knowledge_item_type', 'life_cycle_status']
321 missing_fields = [field for field in required_fields if field not in processed_data]
322 if missing_fields:
323 print(f"β οΈ Warning: Missing required fields in API response: {missing_fields}")
324 # Skip this item if required fields are missing
325 continue
326
327 knowledge_items.append(KnowledgeItem(**processed_data))
328
329 return {
330 'knowledge_items': knowledge_items,
331 'next_token': response.get('nextToken')
332 }
333
334 except ClientError as e:
335 print(f"β Failed to list knowledge items: {e}")
336 raise
337
338 def create_runbook(
339 self,
340 title: str,
341 description: str,
342 steps: List[str],
343 prerequisites: Optional[List[str]] = None,
344 tags: Optional[List[str]] = None
345 ) -> KnowledgeItem:
346 """Create a runbook knowledge item"""
347 content = f"""# {title}
348
349## Description
350{description}
351
352## Prerequisites
353{chr(10).join(f"- {prereq}" for prereq in (prerequisites or []))}
354
355## Steps
356{chr(10).join(f"{i+1}. {step}" for i, step in enumerate(steps))}
357
358## Tags
359{", ".join(tags or [])}
360"""
361
362 properties = {
363 'description': description,
364 'step_count': str(len(steps)),
365 'has_prerequisites': str(bool(prerequisites))
366 }
367
368 if tags:
369 properties['tags'] = ','.join(tags)
370
371 return self.create_knowledge_item(
372 title=title,
373 content=content,
374 content_type="text/markdown",
375 knowledge_item_type="RUNBOOK",
376 life_cycle_status="ACTIVE",
377 properties={
378 'runbook': {
379 'title': title,
380 'description': description
381 }
382 }
383 )
384
385 def create_troubleshooting_guide(
386 self,
387 title: str,
388 problem_description: str,
389 symptoms: List[str],
390 solutions: List[Dict[str, Any]],
391 prevention_tips: Optional[List[str]] = None
392 ) -> KnowledgeItem:
393 """Create a troubleshooting guide"""
394 content = f"""# {title}
395
396## Problem Description
397{problem_description}
398
399## Symptoms
400{chr(10).join(f"- {symptom}" for symptom in symptoms)}
401
402## Solutions
403"""
404
405 for i, solution in enumerate(solutions, 1):
406 content += f"""
407### Solution {i}: {solution.get('title', 'Untitled')}
408**Priority:** {solution.get('priority', 'Medium')}
409
410{solution.get('description', '')}
411
412**Steps:**
413{chr(10).join(f"1. {step}" for step in solution.get('steps', []))}
414"""
415
416 if prevention_tips:
417 content += f"""
418## Prevention Tips
419{chr(10).join(f"- {tip}" for tip in prevention_tips)}
420"""
421
422 properties = {
423 'problem_type': 'troubleshooting',
424 'solution_count': str(len(solutions)),
425 'symptom_count': str(len(symptoms))
426 }
427
428 return self.create_knowledge_item(
429 title=title,
430 content=content,
431 content_type="text/markdown",
432 knowledge_item_type="RUNBOOK",
433 life_cycle_status="ACTIVE",
434 properties={
435 'runbook': {
436 'title': title,
437 'description': problem_description
438 }
439 }
440 )
441
442
443def display_knowledge_item(item: KnowledgeItem):
444 """Display a knowledge item in a formatted way"""
445 print(f"\nπ Knowledge Item: {item.title}")
446 print(f" ID: {item.knowledge_item_id}")
447 print(f" Type: {item.knowledge_item_type}")
448 print(f" Status: {item.life_cycle_status}")
449 print(f" Created: {item.created_at}")
450 print(f" Updated: {item.updated_at}")
451 print(f" Version: {item.version}")
452
453 if item.properties:
454 print(f" Properties: {json.dumps(item.properties, indent=2)}")
455
456 print(f"\nπ Content ({item.content['type']}):")
457 print("-" * 50)
458 print(item.content['value'])
459 print("-" * 50)
460
461
462def interactive_menu(manager: KnowledgebaseManager):
463 """Interactive menu for knowledgebase management"""
464 while True:
465 print("\n" + "="*60)
466 print("π§ DevOps Agent Knowledgebase Management")
467 print("="*60)
468 print("1. π Create new knowledge item")
469 print("2. π List knowledge items")
470 print("3. π Get specific knowledge item")
471 print("4. βοΈ Update knowledge item")
472 print("5. ποΈ Delete knowledge item")
473 print("6. π Create runbook")
474 print("7. π§ Create troubleshooting guide")
475 print("8. π Update lifecycle status")
476 print("9. π Show statistics")
477 print("0. πͺ Exit")
478 print("="*60)
479
480 choice = input("Choose an option (0-9): ").strip()
481
482 try:
483 if choice == "0":
484 print("π Goodbye!")
485 break
486
487 elif choice == "1":
488 # Create knowledge item
489 title = input("Title: ").strip()
490 content = input("Content: ").strip()
491 content_type = input("Content type (text/markdown): ").strip() or "text/markdown"
492 item_type = input("Type (POLICY/RUNBOOK): ").strip() or "RUNBOOK"
493
494 item = manager.create_knowledge_item(
495 title=title,
496 content=content,
497 content_type=content_type,
498 knowledge_item_type=item_type
499 )
500 print(f"β
Created knowledge item: {item.knowledge_item_id}")
501 display_knowledge_item(item)
502
503 elif choice == "2":
504 # List knowledge items
505 item_type = input("Filter by type (optional): ").strip() or None
506 status = input("Filter by status (optional): ").strip() or None
507
508 result = manager.list_knowledge_items(
509 knowledge_item_type=item_type,
510 life_cycle_status=status
511 )
512
513 print(f"\nπ Found {len(result['knowledge_items'])} knowledge items:")
514 for item in result['knowledge_items']:
515 print(f" β’ {item.knowledge_item_id}: {item.title} ({item.knowledge_item_type}, {item.life_cycle_status})")
516
517 elif choice == "3":
518 # Get specific item
519 item_id = input("Knowledge item ID: ").strip()
520 item = manager.get_knowledge_item(item_id)
521 display_knowledge_item(item)
522
523 elif choice == "4":
524 # Update item
525 item_id = input("Knowledge item ID: ").strip()
526 title = input("New title (leave empty to keep current): ").strip() or None
527 content = input("New content (leave empty to keep current): ").strip() or None
528
529 item = manager.update_knowledge_item(
530 knowledge_item_id=item_id,
531 title=title,
532 content=content
533 )
534 print("β
Updated knowledge item")
535 display_knowledge_item(item)
536
537 elif choice == "5":
538 # Delete item
539 item_id = input("Knowledge item ID: ").strip()
540 confirm = input(f"Are you sure you want to delete {item_id}? (yes/no): ").strip().lower()
541 if confirm == "yes":
542 success = manager.delete_knowledge_item(item_id)
543 if success:
544 print("β
Knowledge item deleted successfully")
545 else:
546 print("β Failed to delete knowledge item")
547 else:
548 print("β Deletion cancelled")
549
550 elif choice == "6":
551 # Create runbook
552 title = input("Runbook title: ").strip()
553 description = input("Description: ").strip()
554 steps_input = input("Steps (comma-separated): ").strip()
555 steps = [step.strip() for step in steps_input.split(",") if step.strip()]
556
557 item = manager.create_runbook(
558 title=title,
559 description=description,
560 steps=steps
561 )
562 print(f"β
Created runbook: {item.knowledge_item_id}")
563 display_knowledge_item(item)
564
565 elif choice == "7":
566 # Create troubleshooting guide
567 title = input("Guide title: ").strip()
568 problem = input("Problem description: ").strip()
569 symptoms_input = input("Symptoms (comma-separated): ").strip()
570 symptoms = [s.strip() for s in symptoms_input.split(",") if s.strip()]
571
572 solution_title = input("Solution title: ").strip()
573 solution_desc = input("Solution description: ").strip()
574 solution_steps_input = input("Solution steps (comma-separated): ").strip()
575 solution_steps = [s.strip() for s in solution_steps_input.split(",") if s.strip()]
576
577 solutions = [{
578 'title': solution_title,
579 'description': solution_desc,
580 'steps': solution_steps,
581 'priority': 'High'
582 }]
583
584 item = manager.create_troubleshooting_guide(
585 title=title,
586 problem_description=problem,
587 symptoms=symptoms,
588 solutions=solutions
589 )
590 print(f"β
Created troubleshooting guide: {item.knowledge_item_id}")
591 display_knowledge_item(item)
592
593 elif choice == "8":
594 # Update lifecycle status
595 item_id = input("Knowledge item ID: ").strip()
596 print("Available statuses: ACTIVE, INACTIVE")
597 new_status = input("New status: ").strip()
598
599 item = manager.update_knowledge_item(
600 knowledge_item_id=item_id,
601 life_cycle_status=new_status
602 )
603 print(f"β
Updated status to: {item.life_cycle_status}")
604
605 elif choice == "9":
606 # Show statistics
607 result = manager.list_knowledge_items(limit=1000)
608 items = result['knowledge_items']
609
610 stats = {
611 'total': len(items),
612 'by_type': {},
613 'by_status': {}
614 }
615
616 for item in items:
617 stats['by_type'][item.knowledge_item_type] = stats['by_type'].get(item.knowledge_item_type, 0) + 1
618 stats['by_status'][item.life_cycle_status] = stats['by_status'].get(item.life_cycle_status, 0) + 1
619
620 print(f"\nπ Knowledgebase Statistics:")
621 print(f" Total items: {stats['total']}")
622 print(f" By type: {stats['by_type']}")
623 print(f" By status: {stats['by_status']}")
624
625 else:
626 print("β Invalid choice. Please try again.")
627
628 except Exception as e:
629 print(f"β Error: {e}")
630
631
632def main():
633 """Main function"""
634 print("π€ Community DevOps Agent - Knowledgebase Management Example")
635 print("="*70)
636
637 # Get agent space ID
638 agent_space_id = os.getenv("AGENT_SPACE_ID")
639 if not agent_space_id:
640 print("β AGENT_SPACE_ID environment variable is required")
641 print(" Set it with: export AGENT_SPACE_ID=your-agent-space-uuid")
642 sys.exit(1)
643
644 # Get AWS region
645 region = os.getenv("AWS_REGION", "us-east-1")
646
647 try:
648 # Initialize manager
649 manager = KnowledgebaseManager(agent_space_id, region)
650
651 # Check if running interactively
652 if len(sys.argv) > 1 and sys.argv[1] == "--demo":
653 # Run demo mode
654 run_demo(manager)
655 else:
656 # Run interactive mode
657 interactive_menu(manager)
658
659 except KeyboardInterrupt:
660 print("\nπ Interrupted by user")
661 except Exception as e:
662 print(f"β Fatal error: {e}")
663 sys.exit(1)
664
665
666def run_demo(manager: KnowledgebaseManager):
667 """Run a demonstration of knowledgebase features"""
668 print("\n㪠Running Knowledgebase Management Demo")
669 print("="*50)
670
671 try:
672 # Create a sample runbook to demonstrate creation
673 print("\nπ Creating a sample runbook...")
674 runbook = manager.create_runbook(
675 title="Demo: AWS Lambda Deployment Runbook",
676 description="Standard procedure for deploying Lambda functions in demo environment",
677 steps=[
678 "Review code changes and run tests",
679 "Update Lambda function code via AWS Console or CLI",
680 "Update environment variables if needed",
681 "Test function in staging environment",
682 "Deploy to production environment",
683 "Monitor CloudWatch logs for errors",
684 "Update documentation with any changes"
685 ],
686 prerequisites=[
687 "AWS CLI configured with appropriate permissions",
688 "Lambda function already exists",
689 "Code has been reviewed and approved"
690 ],
691 tags=["aws", "lambda", "deployment", "demo"]
692 )
693 print(f"β
Created runbook: {runbook.title}")
694
695 # List all knowledge items to show the new runbook and existing items
696 print("\nπ Listing all knowledge items...")
697 result = manager.list_knowledge_items()
698 items = result['knowledge_items']
699 print(f"Found {len(items)} items:")
700
701 for i, item in enumerate(items, 1):
702 print(f" {i}. {item.title} (ID: {item.knowledge_item_id[:12]}...)")
703
704 if len(items) < 3:
705 print("β οΈ Not enough items for full demo operations. Need at least 3 items.")
706 return
707
708 # Use items for testing operations (skip the newly created one for update/get/delete)
709 item1_id = items[0].knowledge_item_id # First existing item for update
710 item2_id = items[1].knowledge_item_id # Second existing item for get
711 item3_id = items[2].knowledge_item_id # Third existing item for delete
712
713 print(f"\nπ Using existing item IDs for operations:")
714 print(f" Item 1 (for update): {item1_id[:12]}... ({items[0].title})")
715 print(f" Item 2 (for get): {item2_id[:12]}... ({items[1].title})")
716 print(f" Item 3 (for delete): {item3_id[:12]}... ({items[2].title})")
717
718 # Update the first existing item
719 print(f"\nπ Updating item 1...")
720 try:
721 updated_item = manager.update_knowledge_item(
722 knowledge_item_id=item1_id,
723 properties={
724 'runbook': {
725 'title': f'Updated: {items[0].title}',
726 'description': f'Updated description for {items[0].title}'
727 }
728 }
729 )
730 print(f"β
Item 1 updated successfully")
731 except Exception as e:
732 print(f"β οΈ Update operation failed: {str(e)}")
733
734 # Get details of the second existing item
735 print(f"\nπ Getting details of item 2...")
736 try:
737 retrieved_item = manager.get_knowledge_item(item2_id)
738 print(f"β
Retrieved item 2 details (may be minimal response)")
739 except Exception as e:
740 print(f"β οΈ Get operation failed: {str(e)}")
741
742 # Delete the third existing item
743 print(f"\nποΈ Deleting item 3...")
744 try:
745 delete_success = manager.delete_knowledge_item(item3_id)
746 if delete_success:
747 print("β
Successfully deleted item 3")
748 else:
749 print("β οΈ Delete operation completed but returned no confirmation")
750 except Exception as e:
751 print(f"β οΈ Delete operation failed: {str(e)}")
752
753 # Final summary
754 print(f"\nπ Demo completed successfully!")
755 print(f"β
Created new runbook and tested all CRUD operations")
756 print(f"π Created runbook: '{runbook.title}'")
757 print(f"π Total items found: {len(items)}")
758 print(f"π Demonstrated Create, Update, Get, and Delete operations")
759
760 except Exception as e:
761 print(f"β Demo failed: {e}")
762 raise
763
764
765if __name__ == "__main__":
766 main()
Test Execution Discoveryο
1#!/usr/bin/env python3
2"""
3Test script for discovering chat-supporting execution IDs.
4
5This script tests the execution discovery logic without creating new investigations.
6"""
7
8import os
9import boto3
10from botocore.exceptions import ClientError
11from config import get_config
12
13# Import TimedAgentMonitor from the demo script
14import sys
15sys.path.append(os.path.dirname(__file__))
16from interactive_monitoring_timed_demo import TimedAgentMonitor
17
18
19def test_execution_discovery():
20 """Test finding chat-supporting executions."""
21
22 # Get configuration
23 config = get_config()
24 config.print_configuration()
25
26 # Configuration
27 agent_space_id = config.agent_space_id or os.getenv('AGENT_SPACE_ID')
28 user_id = config.user_id
29 region = config.region
30
31 print("π Testing Execution Discovery")
32 print("=" * 50)
33 print(f"Agent Space ID: {agent_space_id}")
34 print(f"User ID: {user_id}")
35 print(f"Region: {region}")
36 print()
37
38 # Create boto3 client
39 client = boto3.client('community-devops-agent', region_name=region)
40
41 # Create monitor
42 monitor = TimedAgentMonitor(client, agent_space_id, user_id)
43
44 # Test 1: Try with a dummy task_id (this should fail gracefully)
45 print("Test 1: Finding executions for a dummy task ID")
46 dummy_task_id = "dummy-task-id"
47 execution_id = monitor.get_execution_id(dummy_task_id)
48 if execution_id:
49 print(f"β
Found execution: {execution_id}")
50 else:
51 print("β No execution found for dummy task")
52
53 print()
54
55 # Test 2: Try to find executions across all tasks (if API allows)
56 print("Test 2: Finding chat-supporting executions across all tasks")
57 print("β API Limitation: list_executions requires a taskId parameter")
58 print(" Cannot discover executions without knowing a valid taskId first")
59 print()
60
61 # Test 3: List existing tasks and try the first one
62 print("Test 3: Listing existing tasks and trying the first one")
63 try:
64 response = client.list_tasks(
65 agentSpaceId=agent_space_id,
66 limit=10
67 )
68
69 tasks = response.get('tasks', [])
70 print(f"Found {len(tasks)} existing tasks")
71
72 if tasks:
73 first_task = tasks[0]
74 task_id = first_task['taskId']
75 task_type = first_task.get('taskType', 'unknown')
76 status = first_task.get('status', 'unknown')
77
78 print(f"Using first task: {task_id}")
79 print(f" Type: {task_type}")
80 print(f" Status: {status}")
81
82 execution_id = monitor.get_execution_id(task_id)
83 if execution_id:
84 print(f"β
Found chat-supporting execution: {execution_id}")
85 return execution_id
86 else:
87 print("β No chat-supporting execution found for first task")
88 else:
89 print("β No existing tasks found")
90
91 except ClientError as e:
92 print(f"β Error listing tasks: {e}")
93
94 return None
95
96
97if __name__ == '__main__':
98 execution_id = test_execution_discovery()
99 if execution_id:
100 print(f"\nπ Success! Chat-supporting execution ID: {execution_id}")
101 print("You can use this execution ID for testing chat messages.")
102 else:
103 print("\nπ₯ Test failed - no chat-supporting executions found.")
Goal Journal Timelineο
1#!/usr/bin/env python3
2"""
3Goal Journal Timeline - Fetch and display journal records from all executions for goals.
4
5This script retrieves all goals in an agent space, creates evaluation tasks for goals
6that don't have associated tasks, fetches executions for those tasks, and displays
7journal records in a chronological timeline organized by goal.
8"""
9
10import os
11import sys
12import json
13import uuid
14from datetime import datetime
15from typing import List, Dict, Any, Optional
16import boto3
17import devopsagent_api
18from devopsagent_api.models import (
19 Goal, GoalType, GoalStatus, Task, TaskType, TaskPriority,
20 Execution, JournalRecord, parse_journal_content
21)
22from devopsagent_api.exceptions import DevOpsAgentError
23from config import get_config
24
25
26def get_agent_space_id():
27 """Get agent space ID from environment or prompt user."""
28 agent_space_id = os.getenv('AGENT_SPACE_ID')
29 if not agent_space_id:
30 print("β AGENT_SPACE_ID environment variable not set")
31 print(" Please set it with: export AGENT_SPACE_ID=your-agent-space-uuid")
32 print(" Or run: AGENT_SPACE_ID=your-uuid python examples/goal_journal_timeline.py")
33 sys.exit(1)
34 return agent_space_id
35
36
37def list_goals_for_agent_space(client, agent_space_id: str) -> List[Goal]:
38 """List all goals for an agent space."""
39 print(f"\nπ― Listing goals for agent space: {agent_space_id[:8]}...")
40 print("-" * 55)
41
42 goals = []
43 next_token = None
44
45 try:
46 while True:
47 # Build request parameters
48 request_params = {
49 'agentSpaceId': agent_space_id,
50 'limit': 50 # Get up to 50 goals at a time
51 }
52 if next_token:
53 request_params['nextToken'] = next_token
54
55 response = client.list_goals(**request_params)
56
57 batch = response.get('goals', [])
58 goals.extend(batch)
59
60 print(f" Found {len(batch)} goals in this batch")
61
62 next_token = response.get('nextToken')
63 if not next_token:
64 break
65
66 print(f"β
Total goals found: {len(goals)}")
67
68 # Sort goals by creation time
69 goals.sort(key=lambda x: x['createdAt'])
70
71 return goals
72
73 except DevOpsAgentError as e:
74 print(f"β Failed to list goals: {e}")
75 return []
76
77
78def create_task_for_goal(client, agent_space_id: str, goal: Dict[str, Any]) -> Optional[str]:
79 """
80 Create an evaluation task for a goal that doesn't have an associated task.
81
82 Returns the task_id of the created task, or None if creation failed.
83 """
84 goal_id = goal['goalId']
85 goal_type = goal['goalType']
86 goal_title = goal['title']
87
88 print(f"\nπ Creating evaluation task for goal: {goal_id[:8]}...")
89 print("-" * 60)
90
91 try:
92 # Create task with goal information in description
93 task_description = json.dumps({
94 "goal_id": goal_id,
95 "goalType": goal_type
96 })
97
98 response = client.create_task(
99 agentSpaceId=agent_space_id,
100 clientToken=str(uuid.uuid4()), # For idempotency
101 title=f"Evaluation task for Goal",
102 description=task_description,
103 taskType="EVALUATION",
104 priority="LOW"
105 )
106
107 task = response.get('task', {})
108 if task:
109 task_id = task['taskId']
110 print(f"β
Created evaluation task: {task_id[:8]}")
111 print(f" Title: {task['title']}")
112 print(f" Status: {task['status']}")
113 return task_id
114 else:
115 print("β Task creation failed - no task data returned")
116 return None
117
118 except DevOpsAgentError as e:
119 print(f"β Failed to create task for goal {goal_id[:8]}: {e}")
120 return None
121
122
123def get_task_for_goal(client, agent_space_id: str, goal: Dict[str, Any]) -> Optional[Task]:
124 """
125 Get the task associated with a goal, creating one if necessary.
126
127 Returns the task dict, or None if retrieval/creation failed.
128 """
129 goal_id = goal['goalId']
130 last_task_id = goal.get('lastTaskId')
131
132 if last_task_id:
133 print(f"\nπ Goal {goal_id[:8]} has existing task: {last_task_id[:8]}")
134 try:
135 response = client.get_task(
136 agentSpaceId=agent_space_id,
137 taskId=last_task_id
138 )
139 task = response.get('task', {})
140 if task:
141 print(f" β
Retrieved existing task (Status: {task['status']})")
142 return task
143 else:
144 print(" β Task not found, will create new one")
145 except DevOpsAgentError as e:
146 print(f" β Failed to get existing task: {e}")
147
148 # No existing task or failed to retrieve, create new one
149 task_id = create_task_for_goal(client, agent_space_id, goal)
150 if task_id:
151 try:
152 response = client.get_task(
153 agentSpaceId=agent_space_id,
154 taskId=task_id
155 )
156 task = response.get('task', {})
157 return task if task else None
158 except DevOpsAgentError as e:
159 print(f"β Failed to get newly created task: {e}")
160 return None
161 else:
162 return None
163
164
165def list_executions_for_task(client, agent_space_id: str, task_id: str) -> List[Execution]:
166 """List all executions for a specific task."""
167 print(f"\nπ Listing executions for task: {task_id[:8]}...")
168 print("-" * 50)
169
170 executions = []
171 next_token = None
172
173 try:
174 while True:
175 # Build request parameters
176 request_params = {
177 'agentSpaceId': agent_space_id,
178 'taskId': task_id,
179 'limit': 50 # Get up to 50 executions at a time
180 }
181 if next_token:
182 request_params['nextToken'] = next_token
183
184 response = client.list_executions(**request_params)
185
186 batch = response.get('executions', [])
187 executions.extend(batch)
188
189 print(f" Found {len(batch)} executions in this batch")
190
191 next_token = response.get('nextToken')
192 if not next_token:
193 break
194
195 print(f"β
Total executions found: {len(executions)}")
196
197 # Sort executions by creation time
198 executions.sort(key=lambda x: x['createdAt'])
199
200 return executions
201
202 except DevOpsAgentError as e:
203 print(f"β Failed to list executions: {e}")
204 return []
205
206
207def fetch_journal_records_for_execution(client, agent_space_id: str, execution_id: str) -> List[JournalRecord]:
208 """Fetch all journal records for a specific execution."""
209 records = []
210 next_token = None
211
212 try:
213 while True:
214 # Build request parameters
215 request_params = {
216 'agentSpaceId': agent_space_id,
217 'executionId': execution_id,
218 'limit': 100 # Get up to 100 records at a time
219 }
220 if next_token:
221 request_params['nextToken'] = next_token
222
223 response = client.get_journal_records(**request_params)
224
225 batch = response.get('records', [])
226 records.extend(batch)
227
228 next_token = response.get('nextToken')
229 if not next_token:
230 break
231
232 return records
233
234 except DevOpsAgentError as e:
235 print(f" β Failed to fetch records for execution {execution_id[:8]}: {e}")
236 return []
237
238
239def get_content_summary(record: JournalRecord) -> str:
240 """Extract a meaningful summary from journal record content."""
241 try:
242 # Use the record type from the top level
243 record_type = record.get('recordType', 'unknown')
244
245 # Parse the content JSON
246 data = json.loads(record['content'])
247
248 if record_type == 'message':
249 # Extract message content - content is a list of blocks
250 content_blocks = data.get('content', [])
251 role = data.get('role', 'unknown')
252 if content_blocks and isinstance(content_blocks, list):
253 for block in content_blocks:
254 if isinstance(block, dict) and 'text' in block:
255 text = block['text'][:60]
256 return f"[{role}] {text}"
257 elif isinstance(block, str):
258 return f"[{role}] {block[:60]}"
259 # Try other possible message fields
260 if 'text' in data:
261 return f"[{role}] {data['text'][:60]}"
262 elif 'message' in data:
263 return f"[{role}] {data['message'][:60]}"
264 return f"[{role}] Message"
265
266 elif record_type == 'plan':
267 return f"Plan: {data.get('title', 'Untitled plan')}"
268
269 elif record_type == 'activity':
270 return f"Activity: {data.get('title', 'Untitled activity')} ({data.get('status', 'unknown')})"
271
272 elif record_type == 'observation':
273 return f"Observation: {data.get('title', 'Untitled observation')}"
274
275 elif record_type == 'symptom':
276 return f"Symptom: {data.get('title', 'Untitled symptom')}"
277
278 elif record_type == 'finding':
279 return f"Finding: {data.get('title', 'Untitled finding')}"
280
281 elif record_type == 'investigation_summary':
282 return f"Investigation Summary: {data.get('title', 'Summary')}"
283
284 elif record_type == 'mitigation_summary':
285 return f"Mitigation Summary: {data.get('title', 'Summary')}"
286
287 else:
288 # Generic fallback - show key fields
289 keys = list(data.keys())
290 if 'title' in data:
291 return f"{record_type}: {data['title']}"
292 elif 'description' in data:
293 return f"{record_type}: {data['description'][:50]}..."
294 elif keys:
295 return f"{record_type}: {keys[0]}={str(data[keys[0]])[:30]}..."
296 else:
297 return f"{record_type} record"
298
299 except Exception as e:
300 # Fallback: try to extract from raw JSON
301 try:
302 data = json.loads(record['content'])
303
304 # Handle different record types based on content
305 content_type = data.get('type', 'unknown')
306
307 if content_type == 'message':
308 content_blocks = data.get('content', [])
309 role = data.get('role', 'unknown')
310 if content_blocks and isinstance(content_blocks, list):
311 for block in content_blocks:
312 if isinstance(block, dict) and 'text' in block:
313 text = block['text'][:60]
314 return f"[{role}] {text}"
315 return f"[{role}] Message"
316
317 elif 'title' in data:
318 return f"{content_type}: {data['title']}"
319 elif 'description' in data:
320 return f"{content_type}: {data['description'][:50]}..."
321 else:
322 return f"{content_type} record"
323
324 except Exception as parse_error:
325 # Last resort: show raw content
326 raw_content = record['content'][:100] + "..." if len(record['content']) > 100 else record['content']
327 return f"Raw: {raw_content}"
328
329
330def format_timestamp(timestamp: float) -> str:
331 """Format Unix timestamp to human-readable string."""
332 dt = datetime.fromtimestamp(timestamp)
333 return dt.strftime('%Y-%m-%d %H:%M:%S')
334
335
336def display_goal_journal_timeline(goal_data: Dict[str, Any]):
337 """Display journal records organized by goal in a detailed timeline format."""
338 if not goal_data:
339 print("β No goal data found")
340 return
341
342 print(f"\nπ― Goal Journal Timeline ({len(goal_data)} goals)")
343 print("=" * 120)
344
345 total_records = 0
346 total_executions = 0
347
348 for goal_info in goal_data:
349 goal = goal_info['goal']
350 task = goal_info.get('task')
351 executions = goal_info.get('executions', [])
352 records = goal_info.get('records', [])
353
354 goal_id = goal['goalId']
355 goal_title = goal['title']
356 goal_type = goal['goalType']
357 goal_status = goal['status']
358
359 print(f"\nπ― GOAL: {goal_title}")
360 print(f" ID: {goal_id} | Type: {goal_type} | Status: {goal_status}")
361 print(f" Task: {task['taskId'][:8] if task else 'None'} | Executions: {len(executions)} | Records: {len(records)}")
362 print("-" * 110)
363
364 if not records:
365 print(" βΉοΈ No journal records found for this goal")
366 continue
367
368 total_records += len(records)
369 total_executions += len(executions)
370
371 # Display records for this goal
372 for i, record in enumerate(records, 1):
373 timestamp = format_timestamp(record['createdAt'])
374 execution_id = record['executionId']
375 record_type = record['recordType']
376 record_id = record.get('recordId', 'N/A')
377
378 print(f"\n π Record {i:2d} | {timestamp}")
379 print(f" π ID: {record_id} | βοΈ Exec: {execution_id}")
380 print(f" π Type: {record_type}")
381
382 # Show content summary
383 summary = get_content_summary(record)
384 print(f" π¬ {summary}")
385
386 print()
387
388 print("=" * 120)
389 print("π SUMMARY:")
390 print(f" Goals processed: {len(goal_data)}")
391 print(f" Total executions: {total_executions}")
392 print(f" Total journal records: {total_records}")
393
394
395def interactive_goal_selection(client, agent_space_id):
396 """Allow user to select goals interactively for demonstration."""
397 print("\nπ― Goal Selection for Journal Timeline")
398 print("-" * 42)
399
400 try:
401 # Get recent goals
402 response = client.list_goals(agentSpaceId=agent_space_id, limit=10)
403
404 goals = response.get('goals', [])
405 if not goals:
406 print("β No goals found in this agent space")
407 print(" Please create some goals first, or check your agent space ID")
408 return []
409
410 print("Available goals:")
411 for i, goal in enumerate(goals, 1):
412 created = datetime.fromisoformat(goal['createdAt'].replace('Z', '+00:00'))
413 task_status = "Has Task" if goal.get('lastTaskId') else "No Task"
414 print(f" {i}. {goal['title'][:40]}... ({goal['goalType']}, {goal['status']}) - {task_status}")
415
416 # For demo purposes, process all goals
417 print(f"\nπ Processing all {len(goals)} goals for journal timeline...")
418 return goals
419
420 except DevOpsAgentError as e:
421 print(f"β Failed to list goals: {e}")
422 return []
423
424
425def main():
426 """Main function to fetch and display goal journal timeline."""
427 print("π― Community DevOps Agent API - Goal Journal Timeline")
428 print("=" * 65)
429
430 # Get configuration
431 config = get_config()
432 config.print_configuration()
433
434 # Check service registration
435 if not devopsagent_api.is_service_registered():
436 print("β Service registration failed")
437 return
438
439 # Get agent space ID
440 agent_space_id = config.agent_space_id or get_agent_space_id()
441 print(f"Using agent space: {agent_space_id[:8]}...")
442
443 # Create client
444 try:
445 client = config.get_client()
446 print("β
Client created successfully")
447 except Exception as e:
448 print(f"β Failed to create client: {e}")
449 return
450
451 # Step 1: List all goals
452 goals = interactive_goal_selection(client, agent_space_id)
453 if not goals:
454 print("\nβ Cannot continue without goals to work with")
455 return
456
457 # Step 2: Process each goal - get/create task, get executions, get journal records
458 goal_data = []
459
460 for i, goal in enumerate(goals, 1):
461 goal_id = goal['goalId']
462 goal_title = goal['title']
463
464 print(f"\n{'='*80}")
465 print(f"π― Processing Goal {i}/{len(goals)}: {goal_title[:50]}...")
466 print(f" Goal ID: {goal_id}")
467 print(f"{'='*80}")
468
469 # Get or create task for this goal
470 task = get_task_for_goal(client, agent_space_id, goal)
471 if not task:
472 print(f"β Skipping goal {goal_id[:8]} - no task available")
473 continue
474
475 # Get executions for this task
476 executions = list_executions_for_task(client, agent_space_id, task['taskId'])
477 if not executions:
478 print(f"βΉοΈ No executions found for task {task['taskId'][:8]}")
479 goal_data.append({
480 'goal': goal,
481 'task': task,
482 'executions': [],
483 'records': []
484 })
485 continue
486
487 # Fetch journal records for each execution
488 print(f"\nπ Fetching journal records for {len(executions)} executions...")
489 print("-" * 65)
490
491 all_records = []
492 for j, execution in enumerate(executions, 1):
493 exec_id = execution['executionId']
494 print(f" [{j}/{len(executions)}] Processing execution: {exec_id[:8]}...")
495
496 records = fetch_journal_records_for_execution(client, agent_space_id, exec_id)
497
498 # Add execution ID to each record for timeline context
499 for record in records:
500 record_with_exec = dict(record)
501 record_with_exec['executionId'] = exec_id
502 all_records.append(record_with_exec)
503
504 print(f" Found {len(records)} journal records")
505
506 # Sort all records for this goal chronologically
507 all_records.sort(key=lambda x: x['createdAt'])
508
509 goal_data.append({
510 'goal': goal,
511 'task': task,
512 'executions': executions,
513 'records': all_records
514 })
515
516 print(f" β
Goal {goal_id[:8]} processed: {len(executions)} executions, {len(all_records)} records")
517
518 # Step 3: Display comprehensive timeline
519 display_goal_journal_timeline(goal_data)
520
521 print("\nπ Goal journal timeline generation complete!")
522 print("\nπ‘ Key Takeaways:")
523 print("β’ Goals can have associated evaluation tasks via lastTaskId")
524 print("β’ Tasks are automatically created for goals that need evaluation")
525 print("β’ Journal records show the complete execution history for each goal")
526 print("β’ Timeline provides chronological view of goal progress")
527 print("β’ Different goal types (ONCALL_REPORT, CUSTOMER_DEFINED, PREDEFINED) have different workflows")
528
529 print("\nπ Record Types Legend:")
530 print("β’ message: User/assistant interactions")
531 print("β’ plan: Planned activities")
532 print("β’ activity: Task execution steps")
533 print("β’ observation: System observations")
534 print("β’ symptom: Identified issues")
535 print("β’ finding: Analysis results")
536 print("β’ investigation_summary: Final summary")
537
538
539if __name__ == "__main__":
540 main()
Journal Timelineο
1#!/usr/bin/env python3
2"""
3Journal Records Timeline - Fetch and display journal records from all executions.
4
5This script allows interactive selection of a task, retrieves all executions for that task,
6fetches journal records from each execution, and displays them in a chronological timeline.
7"""
8
9import os
10import sys
11import json
12from datetime import datetime
13from typing import List, Dict, Any
14import boto3
15import devopsagent_api
16from devopsagent_api.models import JournalRecord, Execution, parse_journal_content
17from devopsagent_api.exceptions import DevOpsAgentError
18from config import get_config
19
20
21def get_agent_space_id():
22 """Get agent space ID from environment or prompt user."""
23 agent_space_id = os.getenv('AGENT_SPACE_ID')
24 if not agent_space_id:
25 print("β AGENT_SPACE_ID environment variable not set")
26 print(" Please set it with: export AGENT_SPACE_ID=your-agent-space-uuid")
27 print(" Or run: AGENT_SPACE_ID=your-uuid python examples/journal_timeline.py")
28 sys.exit(1)
29 return agent_space_id
30
31
32def list_executions_for_task(client, agent_space_id: str, task_id: str) -> List[Execution]:
33 """List all executions for a specific task."""
34 print(f"\nπ Listing executions for task: {task_id[:8]}...")
35 print("-" * 50)
36
37 executions = []
38 next_token = None
39
40 try:
41 while True:
42 # Build request parameters
43 request_params = {
44 'agentSpaceId': agent_space_id,
45 'taskId': task_id,
46 'limit': 50 # Get up to 50 executions at a time
47 }
48 if next_token:
49 request_params['nextToken'] = next_token
50
51 response = client.list_executions(**request_params)
52
53 batch = response.get('executions', [])
54 executions.extend(batch)
55
56 print(f" Found {len(batch)} executions in this batch")
57
58 next_token = response.get('nextToken')
59 if not next_token:
60 break
61
62 print(f"β
Total executions found: {len(executions)}")
63
64 # Sort executions by creation time
65 executions.sort(key=lambda x: x['createdAt'])
66
67 return executions
68
69 except DevOpsAgentError as e:
70 print(f"β Failed to list executions: {e}")
71 return []
72
73
74def fetch_journal_records_for_execution(client, agent_space_id: str, execution_id: str) -> List[JournalRecord]:
75 """Fetch all journal records for a specific execution."""
76 records = []
77 next_token = None
78
79 try:
80 while True:
81 # Build request parameters
82 request_params = {
83 'agentSpaceId': agent_space_id,
84 'executionId': execution_id,
85 'limit': 100 # Get up to 100 records at a time
86 }
87 if next_token:
88 request_params['nextToken'] = next_token
89
90 response = client.get_journal_records(**request_params)
91
92 batch = response.get('records', [])
93 records.extend(batch)
94
95 next_token = response.get('nextToken')
96 if not next_token:
97 break
98
99 return records
100
101 except DevOpsAgentError as e:
102 print(f" β Failed to fetch records for execution {execution_id[:8]}: {e}")
103 return []
104
105
106def get_content_summary(record: JournalRecord) -> str:
107 """Extract a meaningful summary from journal record content."""
108 try:
109 # Use the record type from the top level
110 record_type = record.get('recordType', 'unknown')
111
112 # Parse the content JSON
113 data = json.loads(record['content'])
114
115 if record_type == 'message':
116 # Extract message content - content is a list of blocks
117 content_blocks = data.get('content', [])
118 role = data.get('role', 'unknown')
119 if content_blocks and isinstance(content_blocks, list):
120 for block in content_blocks:
121 if isinstance(block, dict) and 'text' in block:
122 text = block['text'][:60]
123 return f"[{role}] {text}"
124 elif isinstance(block, str):
125 return f"[{role}] {block[:60]}"
126 # Try other possible message fields
127 if 'text' in data:
128 return f"[{role}] {data['text'][:60]}"
129 elif 'message' in data:
130 return f"[{role}] {data['message'][:60]}"
131 return f"[{role}] Message"
132
133 elif record_type == 'plan':
134 return f"Plan: {data.get('title', 'Untitled plan')}"
135
136 elif record_type == 'activity':
137 return f"Activity: {data.get('title', 'Untitled activity')} ({data.get('status', 'unknown')})"
138
139 elif record_type == 'observation':
140 return f"Observation: {data.get('title', 'Untitled observation')}"
141
142 elif record_type == 'symptom':
143 return f"Symptom: {data.get('title', 'Untitled symptom')}"
144
145 elif record_type == 'finding':
146 return f"Finding: {data.get('title', 'Untitled finding')}"
147
148 elif record_type == 'investigation_summary':
149 return f"Investigation Summary: {data.get('title', 'Summary')}"
150
151 elif record_type == 'mitigation_summary':
152 return f"Mitigation Summary: {data.get('title', 'Summary')}"
153
154 else:
155 # Generic fallback - show key fields
156 keys = list(data.keys())
157 if 'title' in data:
158 return f"{record_type}: {data['title']}"
159 elif 'description' in data:
160 return f"{record_type}: {data['description'][:50]}..."
161 elif keys:
162 return f"{record_type}: {keys[0]}={str(data[keys[0]])[:30]}..."
163 else:
164 return f"{record_type} record"
165
166 except Exception as e:
167 # Fallback: try to extract from raw JSON
168 try:
169 data = json.loads(record['content'])
170
171 # Handle different record types based on content
172 content_type = data.get('type', 'unknown')
173
174 if content_type == 'message':
175 content_blocks = data.get('content', [])
176 role = data.get('role', 'unknown')
177 if content_blocks and isinstance(content_blocks, list):
178 for block in content_blocks:
179 if isinstance(block, dict) and 'text' in block:
180 text = block['text'][:60]
181 return f"[{role}] {text}"
182 return f"[{role}] Message"
183
184 elif 'title' in data:
185 return f"{content_type}: {data['title']}"
186 elif 'description' in data:
187 return f"{content_type}: {data['description'][:50]}..."
188 else:
189 return f"{content_type} record"
190
191 except Exception as parse_error:
192 # Last resort: show raw content
193 raw_content = record['content'][:100] + "..." if len(record['content']) > 100 else record['content']
194 return f"Raw: {raw_content}"
195
196
197def format_timestamp(timestamp: float) -> str:
198 """Format Unix timestamp to human-readable string."""
199 dt = datetime.fromtimestamp(timestamp)
200 return dt.strftime('%Y-%m-%d %H:%M:%S')
201
202
203def get_detailed_content(record: JournalRecord) -> str:
204 """Extract detailed content from journal record for rich display."""
205 try:
206 record_type = record.get('recordType', 'unknown')
207 data = json.loads(record['content'])
208
209 if record_type == 'message':
210 role = data.get('role', 'unknown')
211 content_blocks = data.get('content', [])
212 user_ref = data.get('user_reference')
213
214 result = f"Role: {role}\n"
215 if user_ref:
216 result += f"User Reference: {user_ref}\n"
217
218 result += "Content:\n"
219 if content_blocks and isinstance(content_blocks, list):
220 has_content = False
221 for i, block in enumerate(content_blocks):
222 if isinstance(block, dict) and 'text' in block and block['text'].strip():
223 result += f" [{i+1}] {block['text']}\n"
224 has_content = True
225 elif isinstance(block, str) and block.strip():
226 result += f" [{i+1}] {block}\n"
227 has_content = True
228
229 if not has_content:
230 # If no text content, show the full JSON structure
231 result += " (JSON payload):\n"
232 result += json.dumps(data, indent=4) + "\n"
233 else:
234 # No content blocks, show full JSON
235 result += " (JSON payload):\n"
236 result += json.dumps(data, indent=4) + "\n"
237
238 return result.rstrip()
239
240 else:
241 # For non-message records, show structured data
242 result = ""
243 for key, value in data.items():
244 if key == 'content' and isinstance(value, list):
245 result += f"{key}:\n"
246 for i, item in enumerate(value):
247 result += f" [{i+1}] {item}\n"
248 elif isinstance(value, (dict, list)):
249 result += f"{key}:\n{json.dumps(value, indent=4)}\n"
250 else:
251 result += f"{key}: {value}\n"
252 return result.rstrip()
253
254 except Exception as e:
255 # Fallback to raw JSON with formatting
256 try:
257 data = json.loads(record['content'])
258 return json.dumps(data, indent=2)
259 except:
260 return record['content']
261
262
263def display_detailed_timeline(all_records: List[Dict[str, Any]]):
264 """Display journal records in a detailed, multi-line format."""
265 if not all_records:
266 print("β No journal records found")
267 return
268
269 print(f"\nπ Detailed Journal Records Timeline ({len(all_records)} total records)")
270 print("=" * 120)
271
272 for i, record in enumerate(all_records, 1):
273 timestamp = format_timestamp(record['createdAt'])
274 execution_id = record['executionId']
275 record_type = record['recordType']
276 record_id = record.get('recordId', 'N/A')
277 agent_space_id = record.get('agentSpaceId', 'N/A')
278
279 # Compact header with all IDs
280 print(f"\n{'='*110}")
281 print(f"π Record {i:2d} | {timestamp}")
282 print(f"π ID: {record_id} | π’ Space: {agent_space_id} | βοΈ Exec: {execution_id}")
283 print(f"π Type: {record_type}")
284 print(f"{'='*110}")
285
286 # Display detailed content
287 detailed_content = get_detailed_content(record)
288 print("π Content:")
289 # Split content into lines and indent
290 for line in detailed_content.split('\n'):
291 if line.strip(): # Only print non-empty lines
292 print(f" {line}")
293
294 print()
295
296 print("=" * 120)
297
298
299def interactive_task_selection(client, agent_space_id):
300 """Allow user to select a task interactively for journal timeline."""
301 print("\nπ― Task Selection for Journal Timeline")
302 print("-" * 42)
303
304 try:
305 # Get recent tasks
306 response = client.list_tasks(
307 agentSpaceId=agent_space_id,
308 limit=10,
309 sortField='CREATED_AT',
310 order='DESC'
311 )
312
313 tasks = response.get('tasks', [])
314 if not tasks:
315 print("β No tasks found in this agent space")
316 print(" Please create some tasks first, or check your agent space ID")
317 return None
318
319 print("Available tasks (recently created):")
320 for i, task in enumerate(tasks, 1):
321 created = datetime.fromisoformat(task['createdAt'].replace('Z', '+00:00'))
322 print(f" {i}. {task['title'][:50]}... ({task['status']}, {task['taskType']}) - {created.strftime('%Y-%m-%d %H:%M')}")
323
324 # Let user choose
325 while True:
326 try:
327 choice = input(f"\nSelect a task (1-{len(tasks)}) or press Enter for the most recent: ").strip()
328 if not choice:
329 selected_task = tasks[0]
330 print(f"π Using most recent task: {selected_task['title'][:40]}...")
331 break
332 else:
333 choice_idx = int(choice) - 1
334 if 0 <= choice_idx < len(tasks):
335 selected_task = tasks[choice_idx]
336 print(f"π Selected task: {selected_task['title'][:40]}...")
337 break
338 else:
339 print(f"β Invalid choice. Please enter 1-{len(tasks)} or press Enter.")
340 except ValueError:
341 print("β Invalid input. Please enter a number or press Enter.")
342
343 return selected_task['taskId']
344
345 except DevOpsAgentError as e:
346 print(f"β Failed to list tasks: {e}")
347 return None
348
349
350def main():
351 """Main function to fetch and display journal timeline."""
352 print("π Community DevOps Agent API - Journal Records Timeline")
353 print("=" * 65)
354
355 # Get configuration
356 config = get_config()
357 config.print_configuration()
358
359 # Check service registration
360 if not devopsagent_api.is_service_registered():
361 print("β Service registration failed")
362 return
363
364 # Get agent space ID
365 agent_space_id = config.agent_space_id or get_agent_space_id()
366 print(f"Using agent space: {agent_space_id[:8]}...")
367
368 # Create client
369 try:
370 client = config.get_client()
371 print("β
Client created successfully")
372 except Exception as e:
373 print(f"β Failed to create client: {e}")
374 return
375
376 # Select a task interactively
377 task_id = interactive_task_selection(client, agent_space_id)
378 if not task_id:
379 print("\nβ Cannot continue without a task to work with")
380 return
381
382 print(f"Fetching journal timeline for selected task...")
383
384 # Step 1: List all executions for the task
385 executions = list_executions_for_task(client, agent_space_id, task_id)
386
387 if not executions:
388 print("β No executions found for this task")
389 return
390
391 # Step 2: Fetch journal records for each execution
392 print("\nπ Fetching journal records for all executions...")
393 print("-" * 55)
394
395 all_records = []
396 for i, execution in enumerate(executions, 1):
397 exec_id = execution['executionId']
398 print(f" [{i}/{len(executions)}] Processing execution: {exec_id[:8]}...")
399
400 records = fetch_journal_records_for_execution(client, agent_space_id, exec_id)
401
402 # Add execution ID to each record for timeline context
403 for record in records:
404 record_with_exec = dict(record)
405 record_with_exec['executionId'] = exec_id
406 all_records.append(record_with_exec)
407
408 print(f" Found {len(records)} journal records")
409
410 # Step 3: Sort all records chronologically
411 print(f"\nπ Sorting {len(all_records)} total records chronologically...")
412 all_records.sort(key=lambda x: x['createdAt'])
413
414 # Step 4: Display detailed timeline
415 display_detailed_timeline(all_records)
416
417 print("\nπ Journal timeline generation complete!")
418 print(f"\nπ‘ Summary:")
419 print(f"β’ Task ID: {task_id}")
420 print(f"β’ Total executions: {len(executions)}")
421 print(f"β’ Total journal records: {len(all_records)}")
422
423 if executions:
424 print(f"β’ Execution IDs: {', '.join([e['executionId'][:8] + '...' for e in executions])}")
425
426 print("\nπ Record Types Legend:")
427 print("β’ message: User/assistant interactions")
428 print("β’ plan: Planned activities")
429 print("β’ activity: Task execution steps")
430 print("β’ observation: System observations")
431 print("β’ symptom: Identified issues")
432 print("β’ finding: Analysis results")
433 print("β’ investigation_summary: Final summary")
434
435
436if __name__ == "__main__":
437 main()
Configurationο
1#!/usr/bin/env python3
2"""
3Configuration utilities for DevOps Agent API examples.
4
5This module provides centralized configuration management for examples,
6including environment variable handling, service metadata discovery,
7and sensible defaults.
8"""
9
10import os
11import boto3
12import devopsagent_api
13from typing import Dict, List, Optional, Any
14
15
16class ExampleConfig:
17 """Configuration manager for examples."""
18
19 def __init__(self):
20 """Initialize configuration with environment variables and defaults."""
21 self._region = os.getenv('AWS_REGION', 'us-east-1')
22 self._agent_space_id = os.getenv('AGENT_SPACE_ID')
23 self._user_id = os.getenv('USER_ID', 'example-user')
24 self._poll_interval = int(os.getenv('POLL_INTERVAL', '2'))
25 self._default_limit = int(os.getenv('DEFAULT_LIMIT', '10'))
26 self._timeout_seconds = int(os.getenv('TIMEOUT_SECONDS', '60'))
27 self._max_checks = int(os.getenv('MAX_CHECKS', '10'))
28
29 # Cached service metadata
30 self._waiters = None
31 self._paginators = None
32 self._operations = None
33
34 @property
35 def region(self) -> str:
36 """Get AWS region."""
37 return self._region
38
39 @property
40 def agent_space_id(self) -> Optional[str]:
41 """Get agent space ID."""
42 return self._agent_space_id
43
44 @property
45 def user_id(self) -> str:
46 """Get user ID."""
47 return self._user_id
48
49 @property
50 def poll_interval(self) -> int:
51 """Get polling interval in seconds."""
52 return self._poll_interval
53
54 @property
55 def default_limit(self) -> int:
56 """Get default pagination limit."""
57 return self._default_limit
58
59 @property
60 def timeout_seconds(self) -> int:
61 """Get default timeout in seconds."""
62 return self._timeout_seconds
63
64 @property
65 def max_checks(self) -> int:
66 """Get maximum number of status checks."""
67 return self._max_checks
68
69 def get_client(self) -> boto3.client:
70 """Create and return a configured boto3 client."""
71 return boto3.client('community-devops-agent', region_name=self.region)
72
73 def get_available_waiters(self) -> List[str]:
74 """Get list of available waiters from service metadata."""
75 if self._waiters is None:
76 try:
77 client = self.get_client()
78 # Try to get waiter names by attempting to create them
79 potential_waiters = [
80 'task_completed', 'task_failed', 'task_started',
81 'execution_completed', 'execution_failed'
82 ]
83 available = []
84 for waiter_name in potential_waiters:
85 try:
86 client.get_waiter(waiter_name)
87 available.append(waiter_name)
88 except Exception:
89 continue
90 self._waiters = available
91 except Exception:
92 # Fallback to known waiters
93 self._waiters = ['task_completed', 'task_failed', 'task_started']
94
95 return self._waiters
96
97 def get_available_paginators(self) -> List[str]:
98 """Get list of available paginators from service metadata."""
99 if self._paginators is None:
100 try:
101 client = self.get_client()
102 potential_paginators = [
103 'list_tasks', 'list_executions', 'get_journal_records',
104 'list_goals', 'list_recommendations'
105 ]
106 available = []
107 for paginator_name in potential_paginators:
108 try:
109 client.get_paginator(paginator_name)
110 available.append(paginator_name)
111 except Exception:
112 continue
113 self._paginators = available
114 except Exception:
115 # Fallback to known paginators
116 self._paginators = ['list_tasks', 'list_executions', 'get_journal_records']
117
118 return self._paginators
119
120 def get_operations_by_category(self) -> Dict[str, List[str]]:
121 """Get operations grouped by category."""
122 if self._operations is None:
123 try:
124 client = self.get_client()
125 operations = client.meta.service_model.operation_names
126
127 # Group operations by category
128 task_ops = [op for op in operations if 'task' in op.lower()]
129 journal_ops = [op for op in operations if any(x in op.lower() for x in ['journal', 'execution'])]
130 goal_ops = [op for op in operations if any(x in op.lower() for x in ['goal', 'recommendation'])]
131 other_ops = [op for op in operations if op not in task_ops + journal_ops + goal_ops]
132
133 self._operations = {
134 'Task Management': sorted(task_ops),
135 'Journal & Execution': sorted(journal_ops),
136 'Goals & Recommendations': sorted(goal_ops),
137 'Topology & Support': sorted(other_ops)
138 }
139 except Exception:
140 # Fallback to known operations
141 self._operations = {
142 'Task Management': ['CreateTask', 'GetTask', 'ListTasks', 'UpdateTask'],
143 'Journal & Execution': ['GetJournalRecords', 'ListExecutions'],
144 'Goals & Recommendations': ['ListGoals', 'ListRecommendations', 'GetRecommendation', 'UpdateRecommendation'],
145 'Topology & Support': ['QueryTopology', 'GetSupportLevel', 'CreateSupportChat', 'EndSupportChat']
146 }
147
148 return self._operations
149
150 def validate_configuration(self) -> List[str]:
151 """Validate current configuration and return any issues."""
152 issues = []
153
154 if not self.agent_space_id:
155 issues.append("AGENT_SPACE_ID environment variable not set (required for most operations)")
156
157 if not devopsagent_api.is_service_registered():
158 issues.append("Service registration failed - check dependencies")
159
160 return issues
161
162 def print_configuration(self):
163 """Print current configuration for debugging."""
164 print("π§ Example Configuration")
165 print("-" * 30)
166 print(f"AWS Region: {self.region}")
167 print(f"Agent Space ID: {self.agent_space_id or 'Not set'}")
168 print(f"User ID: {self.user_id}")
169 print(f"Poll Interval: {self.poll_interval}s")
170 print(f"Default Limit: {self.default_limit}")
171 print(f"Timeout: {self.timeout_seconds}s")
172 print(f"Max Checks: {self.max_checks}")
173
174 issues = self.validate_configuration()
175 if issues:
176 print("\nβ οΈ Configuration Issues:")
177 for issue in issues:
178 print(f" β’ {issue}")
179 else:
180 print("\nβ
Configuration looks good")
181
182
183# Global configuration instance
184config = ExampleConfig()
185
186
187def get_config() -> ExampleConfig:
188 """Get the global configuration instance."""
189 return config
Running Examplesο
All examples can be run directly:
# Basic usage
python examples/basic_usage.py
# List tasks
python examples/list_tasks.py
# Interactive monitoring
python examples/interactive_monitoring_enhanced.py
Make sure to set your AGENT_SPACE_ID environment variable before running examples that require it.
Example Outputο
Hereβs what you can expect from running the basic usage example:
Configuration:
- Agent Space ID: your-agent-space-uuid
- AWS Region: us-east-1
- User ID: 4be32b4a-9675-4dc0-97ff-7126ad28457c
- Poll Interval: 2s
- Default Limit: 10
- Timeout: 60s
- Max Checks: 10
Service registered successfully
Client created successfully
Listing tasks...
Found 3 tasks
Task 1:
- ID: task-123
- Title: Deploy application
- Status: COMPLETED
- Created: 2025-12-09T10:30:00Z
Task 2:
- ID: task-456
- Title: Run tests
- Status: IN_PROGRESS
- Created: 2025-12-09T11:15:00Z
Task 3:
- ID: task-789
- Title: Update documentation
- Status: PENDING
- Created: 2025-12-09T12:00:00Z