Files
forgebot/internal/controller/providerqueue_controller.go
unkinben 8f48dd838b Add TUI kanban board, review workflow, and new task statuses
Replace task statuses (pending/running/succeeded/failed/cancelled) with
a kanban workflow: todo → in_progress → in_review → done/wontdo.

When a non-review agent task completes, the API auto-creates a child
review task and moves the parent to in_review. Only humans can move
tasks from in_review to done/wontdo via the TUI.

New components:
- cmd/tui: bubbletea kanban board with $EDITOR integration
- POST /api/v1/tasks/{id}/complete: agent completion callback
- Operator --api-url flag for completion callbacks
- ProviderQueue sets tasks to in_progress on pickup
- AgentTask reconciler calls /complete on job finish
2026-06-12 22:47:40 +10:00

184 lines
5.0 KiB
Go

package controller
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
forgebotv1alpha1 "git.unkin.net/unkin/forgebot/api/v1alpha1"
"git.unkin.net/unkin/forgebot/pkg/models"
)
type ProviderQueueReconciler struct {
client.Client
Scheme *runtime.Scheme
HTTPClient *http.Client
}
// +kubebuilder:rbac:groups=forgebot.unkin.net,resources=providerqueues,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups=forgebot.unkin.net,resources=providerqueues/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=forgebot.unkin.net,resources=agenttasks,verbs=create
// +kubebuilder:rbac:groups=forgebot.unkin.net,resources=repositorybindings,verbs=get;list;watch
func (r *ProviderQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
var queue forgebotv1alpha1.ProviderQueue
if err := r.Get(ctx, req.NamespacedName, &queue); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
pollInterval, err := time.ParseDuration(queue.Spec.PollInterval)
if err != nil {
pollInterval = 30 * time.Second
}
httpClient := r.HTTPClient
if httpClient == nil {
httpClient = &http.Client{Timeout: 10 * time.Second}
}
resp, err := httpClient.Get(queue.Spec.Endpoint + "/tasks?status=todo")
if err != nil {
now := metav1.Now()
queue.Status.LastPoll = &now
queue.Status.LastError = err.Error()
_ = r.Status().Update(ctx, &queue)
logger.Error(err, "failed to poll API")
return ctrl.Result{RequeueAfter: pollInterval}, nil
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return ctrl.Result{RequeueAfter: pollInterval}, err
}
var tasks []models.Task
if err := json.Unmarshal(body, &tasks); err != nil {
return ctrl.Result{RequeueAfter: pollInterval}, err
}
var bindings forgebotv1alpha1.RepositoryBindingList
if err := r.List(ctx, &bindings, client.InNamespace(req.Namespace)); err != nil {
return ctrl.Result{RequeueAfter: pollInterval}, err
}
bindingMap := map[string]*forgebotv1alpha1.RepositoryBinding{}
for i := range bindings.Items {
b := &bindings.Items[i]
if b.Spec.ProviderQueueRef == queue.Name {
bindingMap[b.Spec.Repository] = b
}
}
for _, task := range tasks {
binding, ok := bindingMap[task.Repository]
if !ok {
continue
}
if !isAllowed(binding, task.Author, task.Command) {
continue
}
agentTask := &forgebotv1alpha1.AgentTask{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("task-%s", task.ID[:8]),
Namespace: req.Namespace,
Annotations: map[string]string{
"forgebot.unkin.net/api-task-id": task.ID,
},
},
Spec: forgebotv1alpha1.AgentTaskSpec{
PoolRef: binding.Spec.AgentPoolRef,
Command: task.Command,
Skill: resolveSkill(binding, task.Command),
Repository: task.Repository,
Ref: task.Ref,
Context: forgebotv1alpha1.TaskContext{
IssueNumber: task.IssueNumber,
PRNumber: task.PRNumber,
CommentID: task.CommentID,
Body: task.Body,
Author: task.Author,
},
ExtraTools: task.ExtraTools,
ParentTaskRef: task.ParentTaskID,
},
}
if err := r.Create(ctx, agentTask); err != nil {
logger.Error(err, "failed to create AgentTask", "task", task.ID)
continue
}
patchURL := queue.Spec.Endpoint + "/tasks/" + task.ID
patchBody := fmt.Sprintf(`{"status":"in_progress","jobName":"%s"}`, agentTask.Name)
patchReq, _ := http.NewRequestWithContext(ctx, http.MethodPatch, patchURL, strings.NewReader(patchBody))
patchReq.Header.Set("Content-Type", "application/json")
if _, err := httpClient.Do(patchReq); err != nil {
logger.Error(err, "failed to update task status", "task", task.ID)
}
queue.Status.TasksCreated++
logger.Info("created AgentTask", "task", agentTask.Name, "command", task.Command)
}
now := metav1.Now()
queue.Status.LastPoll = &now
queue.Status.LastError = ""
_ = r.Status().Update(ctx, &queue)
return ctrl.Result{RequeueAfter: pollInterval}, nil
}
func isAllowed(binding *forgebotv1alpha1.RepositoryBinding, author, command string) bool {
if len(binding.Spec.AllowedUsers) > 0 {
found := false
for _, u := range binding.Spec.AllowedUsers {
if u == author {
found = true
break
}
}
if !found {
return false
}
}
if len(binding.Spec.AllowedCommands) > 0 {
for _, c := range binding.Spec.AllowedCommands {
if c == command {
return true
}
}
return false
}
return true
}
func resolveSkill(binding *forgebotv1alpha1.RepositoryBinding, command string) string {
for _, m := range binding.Spec.SkillMapping {
if m.Command == command {
return m.Skill
}
}
return command
}
func (r *ProviderQueueReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&forgebotv1alpha1.ProviderQueue{}).
Complete(r)
}