8f48dd838b
Replace task statuses (pending/running/succeeded/failed/cancelled) with
a kanban workflow: todo → in_progress → in_review → done/wontdo.
When a non-review agent task completes, the API auto-creates a child
review task and moves the parent to in_review. Only humans can move
tasks from in_review to done/wontdo via the TUI.
New components:
- cmd/tui: bubbletea kanban board with $EDITOR integration
- POST /api/v1/tasks/{id}/complete: agent completion callback
- Operator --api-url flag for completion callbacks
- ProviderQueue sets tasks to in_progress on pickup
- AgentTask reconciler calls /complete on job finish
219 lines
6.8 KiB
Go
219 lines
6.8 KiB
Go
package controller
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
|
|
batchv1 "k8s.io/api/batch/v1"
|
|
corev1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
ctrl "sigs.k8s.io/controller-runtime"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/log"
|
|
|
|
forgebotv1alpha1 "git.unkin.net/unkin/forgebot/api/v1alpha1"
|
|
"git.unkin.net/unkin/forgebot/pkg/models"
|
|
)
|
|
|
|
type AgentTaskReconciler struct {
|
|
client.Client
|
|
Scheme *runtime.Scheme
|
|
APIURL string
|
|
HTTPClient *http.Client
|
|
}
|
|
|
|
// +kubebuilder:rbac:groups=forgebot.unkin.net,resources=agenttasks,verbs=get;list;watch;create;update;patch;delete
|
|
// +kubebuilder:rbac:groups=forgebot.unkin.net,resources=agenttasks/status,verbs=get;update;patch
|
|
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;delete
|
|
|
|
func (r *AgentTaskReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
|
logger := log.FromContext(ctx)
|
|
|
|
var task forgebotv1alpha1.AgentTask
|
|
if err := r.Get(ctx, req.NamespacedName, &task); err != nil {
|
|
return ctrl.Result{}, client.IgnoreNotFound(err)
|
|
}
|
|
|
|
switch task.Status.Phase {
|
|
case forgebotv1alpha1.TaskPending, "":
|
|
return r.handlePending(ctx, &task)
|
|
case forgebotv1alpha1.TaskRunning:
|
|
return r.handleRunning(ctx, &task)
|
|
default:
|
|
logger.V(1).Info("task in terminal state", "phase", task.Status.Phase)
|
|
return ctrl.Result{}, nil
|
|
}
|
|
}
|
|
|
|
func (r *AgentTaskReconciler) handlePending(ctx context.Context, task *forgebotv1alpha1.AgentTask) (ctrl.Result, error) {
|
|
logger := log.FromContext(ctx)
|
|
|
|
var pool forgebotv1alpha1.AgentPool
|
|
if err := r.Get(ctx, client.ObjectKey{Namespace: task.Namespace, Name: task.Spec.PoolRef}, &pool); err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("get pool %s: %w", task.Spec.PoolRef, err)
|
|
}
|
|
|
|
if pool.Status.ActiveJobs >= pool.Spec.MaxConcurrent {
|
|
logger.Info("pool at capacity, requeueing", "pool", pool.Name, "active", pool.Status.ActiveJobs)
|
|
return ctrl.Result{RequeueAfter: 10_000_000_000}, nil // 10s
|
|
}
|
|
|
|
job := r.buildJob(task, &pool)
|
|
if err := ctrl.SetControllerReference(task, job, r.Scheme); err != nil {
|
|
return ctrl.Result{}, err
|
|
}
|
|
if err := r.Create(ctx, job); err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("create job: %w", err)
|
|
}
|
|
|
|
now := metav1.Now()
|
|
task.Status.Phase = forgebotv1alpha1.TaskRunning
|
|
task.Status.JobName = job.Name
|
|
task.Status.StartTime = &now
|
|
if err := r.Status().Update(ctx, task); err != nil {
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
logger.Info("created job for task", "job", job.Name, "task", task.Name)
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
func (r *AgentTaskReconciler) handleRunning(ctx context.Context, task *forgebotv1alpha1.AgentTask) (ctrl.Result, error) {
|
|
logger := log.FromContext(ctx)
|
|
|
|
var job batchv1.Job
|
|
if err := r.Get(ctx, client.ObjectKey{Namespace: task.Namespace, Name: task.Status.JobName}, &job); err != nil {
|
|
return ctrl.Result{}, client.IgnoreNotFound(err)
|
|
}
|
|
|
|
if job.Status.Succeeded > 0 {
|
|
now := metav1.Now()
|
|
task.Status.Phase = forgebotv1alpha1.TaskSucceeded
|
|
task.Status.EndTime = &now
|
|
if err := r.Status().Update(ctx, task); err != nil {
|
|
return ctrl.Result{}, err
|
|
}
|
|
r.completeAPITask(ctx, task, models.CompleteTaskRequest{})
|
|
logger.Info("task succeeded", "task", task.Name)
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
if job.Status.Failed > 0 {
|
|
now := metav1.Now()
|
|
task.Status.Phase = forgebotv1alpha1.TaskFailed
|
|
task.Status.EndTime = &now
|
|
task.Status.Message = "job failed"
|
|
if err := r.Status().Update(ctx, task); err != nil {
|
|
return ctrl.Result{}, err
|
|
}
|
|
r.completeAPITask(ctx, task, models.CompleteTaskRequest{ErrorMessage: "job failed"})
|
|
logger.Info("task failed", "task", task.Name)
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
return ctrl.Result{RequeueAfter: 15_000_000_000}, nil // 15s
|
|
}
|
|
|
|
func (r *AgentTaskReconciler) buildJob(task *forgebotv1alpha1.AgentTask, pool *forgebotv1alpha1.AgentPool) *batchv1.Job {
|
|
backoffLimit := int32(0)
|
|
ttl := int32(3600)
|
|
|
|
env := []corev1.EnvVar{
|
|
{Name: "FORGEBOT_REPO", Value: task.Spec.Repository},
|
|
{Name: "FORGEBOT_REF", Value: task.Spec.Ref},
|
|
{Name: "FORGEBOT_COMMAND", Value: task.Spec.Command},
|
|
{Name: "FORGEBOT_SKILL", Value: task.Spec.Skill},
|
|
{Name: "FORGEBOT_TASK_ID", Value: task.Name},
|
|
{Name: "FORGEBOT_MODEL", Value: pool.Spec.Model},
|
|
{Name: "FORGEBOT_BODY", Value: task.Spec.Context.Body},
|
|
{Name: "FORGEBOT_AUTHOR", Value: task.Spec.Context.Author},
|
|
{Name: "FORGEBOT_ISSUE_NUMBER", Value: fmt.Sprintf("%d", task.Spec.Context.IssueNumber)},
|
|
{Name: "FORGEBOT_PR_NUMBER", Value: fmt.Sprintf("%d", task.Spec.Context.PRNumber)},
|
|
{Name: "ANTHROPIC_BASE_URL", Value: pool.Spec.Endpoint},
|
|
}
|
|
|
|
if pool.Spec.CredentialSecretRef.Name != "" {
|
|
env = append(env, corev1.EnvVar{
|
|
Name: "ANTHROPIC_API_KEY",
|
|
ValueFrom: &corev1.EnvVarSource{
|
|
SecretKeyRef: &corev1.SecretKeySelector{
|
|
LocalObjectReference: pool.Spec.CredentialSecretRef,
|
|
Key: "api-key",
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
return &batchv1.Job{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: fmt.Sprintf("forgebot-%s", task.Name),
|
|
Namespace: task.Namespace,
|
|
Labels: map[string]string{
|
|
"app.kubernetes.io/managed-by": "forgebot",
|
|
"forgebot.unkin.net/task": task.Name,
|
|
"forgebot.unkin.net/pool": pool.Name,
|
|
"forgebot.unkin.net/command": task.Spec.Command,
|
|
},
|
|
},
|
|
Spec: batchv1.JobSpec{
|
|
BackoffLimit: &backoffLimit,
|
|
TTLSecondsAfterFinished: &ttl,
|
|
Template: corev1.PodTemplateSpec{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Labels: map[string]string{
|
|
"app.kubernetes.io/managed-by": "forgebot",
|
|
"forgebot.unkin.net/task": task.Name,
|
|
},
|
|
},
|
|
Spec: corev1.PodSpec{
|
|
ServiceAccountName: pool.Spec.ServiceAccountName,
|
|
RestartPolicy: corev1.RestartPolicyNever,
|
|
Containers: []corev1.Container{
|
|
{
|
|
Name: "agent",
|
|
Image: pool.Spec.Image,
|
|
Env: env,
|
|
Resources: pool.Spec.Resources,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func (r *AgentTaskReconciler) completeAPITask(ctx context.Context, task *forgebotv1alpha1.AgentTask, req models.CompleteTaskRequest) {
|
|
if r.APIURL == "" {
|
|
return
|
|
}
|
|
apiTaskID := task.Annotations["forgebot.unkin.net/api-task-id"]
|
|
if apiTaskID == "" {
|
|
return
|
|
}
|
|
logger := log.FromContext(ctx)
|
|
|
|
body, _ := json.Marshal(req)
|
|
httpReq, _ := http.NewRequestWithContext(ctx, http.MethodPost,
|
|
r.APIURL+"/api/v1/tasks/"+apiTaskID+"/complete", bytes.NewReader(body))
|
|
httpReq.Header.Set("Content-Type", "application/json")
|
|
|
|
httpClient := r.HTTPClient
|
|
if httpClient == nil {
|
|
httpClient = http.DefaultClient
|
|
}
|
|
if _, err := httpClient.Do(httpReq); err != nil {
|
|
logger.Error(err, "failed to complete API task", "apiTaskID", apiTaskID)
|
|
}
|
|
}
|
|
|
|
func (r *AgentTaskReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
|
return ctrl.NewControllerManagedBy(mgr).
|
|
For(&forgebotv1alpha1.AgentTask{}).
|
|
Owns(&batchv1.Job{}).
|
|
Complete(r)
|
|
}
|