Files
agent-mgr/internal/store/postgres.go
Steven Hooker e5b07cc1d8 initial agent-mgr: app builder platform MVP
Go API server + Preact UI + Claude Code adapter.
- App-centric model (ideas, not repos)
- AgentProvider interface for multi-agent support
- K8s pod lifecycle for sandboxed agent sessions
- Gitea integration (create repos, push branches)
- WebSocket streaming for live session output
- Woodpecker CI/CD pipelines (kaniko build + kubectl deploy)
2026-02-18 15:56:32 +01:00

203 lines
6.4 KiB
Go

package store
import (
"context"
"embed"
"fmt"
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
//go:embed migrations/*.sql
var migrationsFS embed.FS
type Store struct {
pool *pgxpool.Pool
}
func New(ctx context.Context, dsn string) (*Store, error) {
pool, err := pgxpool.New(ctx, dsn)
if err != nil {
return nil, fmt.Errorf("connect to postgres: %w", err)
}
if err := pool.Ping(ctx); err != nil {
return nil, fmt.Errorf("ping postgres: %w", err)
}
return &Store{pool: pool}, nil
}
func (s *Store) Migrate(ctx context.Context) error {
sql, err := migrationsFS.ReadFile("migrations/001_init.sql")
if err != nil {
return fmt.Errorf("read migration: %w", err)
}
_, err = s.pool.Exec(ctx, string(sql))
if err != nil {
return fmt.Errorf("run migration: %w", err)
}
return nil
}
func (s *Store) Close() {
s.pool.Close()
}
// App
type App struct {
ID uuid.UUID `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Status string `json:"status"`
RepoOwner string `json:"repo_owner,omitempty"`
RepoName string `json:"repo_name,omitempty"`
PreviewURL string `json:"preview_url,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
func (s *Store) CreateApp(ctx context.Context, name, description string) (*App, error) {
app := &App{}
err := s.pool.QueryRow(ctx,
`INSERT INTO apps (name, description) VALUES ($1, $2)
RETURNING id, name, description, status, repo_owner, repo_name, preview_url, created_at, updated_at`,
name, description,
).Scan(&app.ID, &app.Name, &app.Description, &app.Status, &app.RepoOwner, &app.RepoName, &app.PreviewURL, &app.CreatedAt, &app.UpdatedAt)
if err != nil {
return nil, fmt.Errorf("create app: %w", err)
}
return app, nil
}
func (s *Store) GetApp(ctx context.Context, id uuid.UUID) (*App, error) {
app := &App{}
err := s.pool.QueryRow(ctx,
`SELECT id, name, description, status, repo_owner, repo_name, preview_url, created_at, updated_at
FROM apps WHERE id = $1`, id,
).Scan(&app.ID, &app.Name, &app.Description, &app.Status, &app.RepoOwner, &app.RepoName, &app.PreviewURL, &app.CreatedAt, &app.UpdatedAt)
if err != nil {
if err == pgx.ErrNoRows {
return nil, nil
}
return nil, fmt.Errorf("get app: %w", err)
}
return app, nil
}
func (s *Store) ListApps(ctx context.Context) ([]App, error) {
rows, err := s.pool.Query(ctx,
`SELECT id, name, description, status, repo_owner, repo_name, preview_url, created_at, updated_at
FROM apps ORDER BY updated_at DESC`)
if err != nil {
return nil, fmt.Errorf("list apps: %w", err)
}
defer rows.Close()
var apps []App
for rows.Next() {
var a App
if err := rows.Scan(&a.ID, &a.Name, &a.Description, &a.Status, &a.RepoOwner, &a.RepoName, &a.PreviewURL, &a.CreatedAt, &a.UpdatedAt); err != nil {
return nil, fmt.Errorf("scan app: %w", err)
}
apps = append(apps, a)
}
return apps, nil
}
func (s *Store) UpdateAppStatus(ctx context.Context, id uuid.UUID, status string) error {
_, err := s.pool.Exec(ctx,
`UPDATE apps SET status = $1, updated_at = now() WHERE id = $2`, status, id)
return err
}
func (s *Store) UpdateAppRepo(ctx context.Context, id uuid.UUID, owner, name string) error {
_, err := s.pool.Exec(ctx,
`UPDATE apps SET repo_owner = $1, repo_name = $2, updated_at = now() WHERE id = $3`, owner, name, id)
return err
}
func (s *Store) DeleteApp(ctx context.Context, id uuid.UUID) error {
_, err := s.pool.Exec(ctx, `DELETE FROM apps WHERE id = $1`, id)
return err
}
// Session
type Session struct {
ID uuid.UUID `json:"id"`
AppID uuid.UUID `json:"app_id"`
Provider string `json:"provider"`
Status string `json:"status"`
Prompt string `json:"prompt"`
Branch string `json:"branch"`
PodName string `json:"pod_name,omitempty"`
Config []byte `json:"config,omitempty"`
CostJSON []byte `json:"cost,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
func (s *Store) CreateSession(ctx context.Context, appID uuid.UUID, provider, prompt, branch string, config []byte) (*Session, error) {
sess := &Session{}
err := s.pool.QueryRow(ctx,
`INSERT INTO sessions (app_id, provider, prompt, branch, config)
VALUES ($1, $2, $3, $4, $5)
RETURNING id, app_id, provider, status, prompt, branch, pod_name, config, cost_json, created_at, updated_at`,
appID, provider, prompt, branch, config,
).Scan(&sess.ID, &sess.AppID, &sess.Provider, &sess.Status, &sess.Prompt, &sess.Branch, &sess.PodName, &sess.Config, &sess.CostJSON, &sess.CreatedAt, &sess.UpdatedAt)
if err != nil {
return nil, fmt.Errorf("create session: %w", err)
}
return sess, nil
}
func (s *Store) GetSession(ctx context.Context, id uuid.UUID) (*Session, error) {
sess := &Session{}
err := s.pool.QueryRow(ctx,
`SELECT id, app_id, provider, status, prompt, branch, pod_name, config, cost_json, created_at, updated_at
FROM sessions WHERE id = $1`, id,
).Scan(&sess.ID, &sess.AppID, &sess.Provider, &sess.Status, &sess.Prompt, &sess.Branch, &sess.PodName, &sess.Config, &sess.CostJSON, &sess.CreatedAt, &sess.UpdatedAt)
if err != nil {
if err == pgx.ErrNoRows {
return nil, nil
}
return nil, fmt.Errorf("get session: %w", err)
}
return sess, nil
}
func (s *Store) ListSessionsByApp(ctx context.Context, appID uuid.UUID) ([]Session, error) {
rows, err := s.pool.Query(ctx,
`SELECT id, app_id, provider, status, prompt, branch, pod_name, config, cost_json, created_at, updated_at
FROM sessions WHERE app_id = $1 ORDER BY created_at DESC`, appID)
if err != nil {
return nil, fmt.Errorf("list sessions: %w", err)
}
defer rows.Close()
var sessions []Session
for rows.Next() {
var sess Session
if err := rows.Scan(&sess.ID, &sess.AppID, &sess.Provider, &sess.Status, &sess.Prompt, &sess.Branch, &sess.PodName, &sess.Config, &sess.CostJSON, &sess.CreatedAt, &sess.UpdatedAt); err != nil {
return nil, fmt.Errorf("scan session: %w", err)
}
sessions = append(sessions, sess)
}
return sessions, nil
}
func (s *Store) UpdateSessionStatus(ctx context.Context, id uuid.UUID, status string) error {
_, err := s.pool.Exec(ctx,
`UPDATE sessions SET status = $1, updated_at = now() WHERE id = $2`, status, id)
return err
}
func (s *Store) UpdateSessionPod(ctx context.Context, id uuid.UUID, podName string) error {
_, err := s.pool.Exec(ctx,
`UPDATE sessions SET pod_name = $1, updated_at = now() WHERE id = $2`, podName, id)
return err
}