package controller import ( "context" "fmt" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" forgebotv1alpha1 "git.unkin.net/unkin/forgebot/api/v1alpha1" ) type AgentTaskReconciler struct { client.Client Scheme *runtime.Scheme } // +kubebuilder:rbac:groups=forgebot.unkin.net,resources=agenttasks,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=forgebot.unkin.net,resources=agenttasks/status,verbs=get;update;patch // +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;delete func (r *AgentTaskReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) var task forgebotv1alpha1.AgentTask if err := r.Get(ctx, req.NamespacedName, &task); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } switch task.Status.Phase { case forgebotv1alpha1.TaskPending, "": return r.handlePending(ctx, &task) case forgebotv1alpha1.TaskRunning: return r.handleRunning(ctx, &task) default: logger.V(1).Info("task in terminal state", "phase", task.Status.Phase) return ctrl.Result{}, nil } } func (r *AgentTaskReconciler) handlePending(ctx context.Context, task *forgebotv1alpha1.AgentTask) (ctrl.Result, error) { logger := log.FromContext(ctx) var pool forgebotv1alpha1.AgentPool if err := r.Get(ctx, client.ObjectKey{Namespace: task.Namespace, Name: task.Spec.PoolRef}, &pool); err != nil { return ctrl.Result{}, fmt.Errorf("get pool %s: %w", task.Spec.PoolRef, err) } if pool.Status.ActiveJobs >= pool.Spec.MaxConcurrent { logger.Info("pool at capacity, requeueing", "pool", pool.Name, "active", pool.Status.ActiveJobs) return ctrl.Result{RequeueAfter: 10_000_000_000}, nil // 10s } job := r.buildJob(task, &pool) if err := ctrl.SetControllerReference(task, job, r.Scheme); err != nil { return ctrl.Result{}, err } if err := r.Create(ctx, job); err != nil { return ctrl.Result{}, fmt.Errorf("create job: %w", err) } now := metav1.Now() task.Status.Phase = forgebotv1alpha1.TaskRunning task.Status.JobName = job.Name task.Status.StartTime = &now if err := r.Status().Update(ctx, task); err != nil { return ctrl.Result{}, err } logger.Info("created job for task", "job", job.Name, "task", task.Name) return ctrl.Result{}, nil } func (r *AgentTaskReconciler) handleRunning(ctx context.Context, task *forgebotv1alpha1.AgentTask) (ctrl.Result, error) { logger := log.FromContext(ctx) var job batchv1.Job if err := r.Get(ctx, client.ObjectKey{Namespace: task.Namespace, Name: task.Status.JobName}, &job); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } if job.Status.Succeeded > 0 { now := metav1.Now() task.Status.Phase = forgebotv1alpha1.TaskSucceeded task.Status.EndTime = &now if err := r.Status().Update(ctx, task); err != nil { return ctrl.Result{}, err } logger.Info("task succeeded", "task", task.Name) return ctrl.Result{}, nil } if job.Status.Failed > 0 { now := metav1.Now() task.Status.Phase = forgebotv1alpha1.TaskFailed task.Status.EndTime = &now task.Status.Message = "job failed" if err := r.Status().Update(ctx, task); err != nil { return ctrl.Result{}, err } logger.Info("task failed", "task", task.Name) return ctrl.Result{}, nil } return ctrl.Result{RequeueAfter: 15_000_000_000}, nil // 15s } func (r *AgentTaskReconciler) buildJob(task *forgebotv1alpha1.AgentTask, pool *forgebotv1alpha1.AgentPool) *batchv1.Job { backoffLimit := int32(0) ttl := int32(3600) env := []corev1.EnvVar{ {Name: "FORGEBOT_REPO", Value: task.Spec.Repository}, {Name: "FORGEBOT_REF", Value: task.Spec.Ref}, {Name: "FORGEBOT_COMMAND", Value: task.Spec.Command}, {Name: "FORGEBOT_SKILL", Value: task.Spec.Skill}, {Name: "FORGEBOT_TASK_ID", Value: task.Name}, {Name: "FORGEBOT_MODEL", Value: pool.Spec.Model}, {Name: "FORGEBOT_BODY", Value: task.Spec.Context.Body}, {Name: "FORGEBOT_AUTHOR", Value: task.Spec.Context.Author}, {Name: "FORGEBOT_ISSUE_NUMBER", Value: fmt.Sprintf("%d", task.Spec.Context.IssueNumber)}, {Name: "FORGEBOT_PR_NUMBER", Value: fmt.Sprintf("%d", task.Spec.Context.PRNumber)}, {Name: "ANTHROPIC_BASE_URL", Value: pool.Spec.Endpoint}, } if pool.Spec.CredentialSecretRef.Name != "" { env = append(env, corev1.EnvVar{ Name: "ANTHROPIC_API_KEY", ValueFrom: &corev1.EnvVarSource{ SecretKeyRef: &corev1.SecretKeySelector{ LocalObjectReference: pool.Spec.CredentialSecretRef, Key: "api-key", }, }, }) } return &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("forgebot-%s", task.Name), Namespace: task.Namespace, Labels: map[string]string{ "app.kubernetes.io/managed-by": "forgebot", "forgebot.unkin.net/task": task.Name, "forgebot.unkin.net/pool": pool.Name, "forgebot.unkin.net/command": task.Spec.Command, }, }, Spec: batchv1.JobSpec{ BackoffLimit: &backoffLimit, TTLSecondsAfterFinished: &ttl, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ "app.kubernetes.io/managed-by": "forgebot", "forgebot.unkin.net/task": task.Name, }, }, Spec: corev1.PodSpec{ ServiceAccountName: pool.Spec.ServiceAccountName, RestartPolicy: corev1.RestartPolicyNever, Containers: []corev1.Container{ { Name: "agent", Image: pool.Spec.Image, Env: env, Resources: pool.Spec.Resources, }, }, }, }, }, } } func (r *AgentTaskReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&forgebotv1alpha1.AgentTask{}). Owns(&batchv1.Job{}). Complete(r) }