diff --git a/frontend/src/components/CodePane.tsx b/frontend/src/components/CodePane.tsx index 597d279..0412325 100644 --- a/frontend/src/components/CodePane.tsx +++ b/frontend/src/components/CodePane.tsx @@ -14,9 +14,11 @@ import { Button } from "@/components/ui/button"; import { codeTreeIconForNode, shouldHideSystemEntry } from "@/components/codeTreeMeta"; import { cn } from "@/lib/utils"; import { + projectGitHistory, projectGitPull, projectGitStatus, projectGitSync, + type GitHistoryCommit, type GitPullResponse, type GitStatusResponse, } from "@/services/gitSync"; @@ -143,6 +145,8 @@ export const CodePane = ({ const [gitBaselinePresent, setGitBaselinePresent] = useState(false); const [gitConflictsPending, setGitConflictsPending] = useState(false); const [gitPullResult, setGitPullResult] = useState(null); + const [latestCommit, setLatestCommit] = useState(null); + const [latestCommitError, setLatestCommitError] = useState(null); const [conflict, setConflict] = useState(false); const { ref: treeWrapRef, height: treeHeight } = useElementHeight(); @@ -156,7 +160,7 @@ export const CodePane = ({ setTreeData(rootNodes); }, [rootNodes]); - const refreshRoots = async () => { + const refreshRoots = useCallback(async () => { setSaveStatus(""); setGitStatus(""); setConflict(false); @@ -177,7 +181,15 @@ export const CodePane = ({ children: n.isDir ? [] : undefined, })) ); - }; + try { + const commits = await projectGitHistory(projectId, { limit: 1 }); + setLatestCommit(commits[0] || null); + setLatestCommitError(null); + } catch { + setLatestCommit(null); + setLatestCommitError("Git activity unavailable"); + } + }, [projectId]); const loadChildren = useCallback(async (dirPath: string) => { const { entries } = await sandboxLs(projectId, dirPath); @@ -197,7 +209,7 @@ export const CodePane = ({ requestAnimationFrame(() => treeRef.current?.open("/")); }) .catch(() => {}); - }, [projectId, loadChildren]); + }, [refreshRoots, loadChildren]); useEffect(() => { // Re-read tree contents when file visibility filters change. @@ -271,6 +283,17 @@ export const CodePane = ({ }, [projectId]); const gitStatusReqIdRef = useRef(0); + const refreshLatestCommit = useCallback(async () => { + try { + const commits = await projectGitHistory(projectId, { limit: 1 }); + setLatestCommit(commits[0] || null); + setLatestCommitError(null); + } catch { + setLatestCommit(null); + setLatestCommitError("Git activity unavailable"); + } + }, [projectId]); + const refreshGitStatus = useCallback(async () => { const reqId = ++gitStatusReqIdRef.current; try { @@ -290,6 +313,10 @@ export const CodePane = ({ void refreshGitStatus(); }, [refreshGitStatus]); + useEffect(() => { + void refreshLatestCommit(); + }, [refreshLatestCommit]); + const startGitSync = useCallback(async (commitMessage?: string) => { if (syncRunningRef.current) { syncPendingRef.current = true; @@ -309,6 +336,7 @@ export const CodePane = ({ setGitStatus(shaShort ? `Synced (${shaShort})` : "Synced"); setTimeout(() => setGitStatus(""), 2500); void refreshGitStatus(); + void refreshLatestCommit(); } catch (e: unknown) { const status = asErrorWithStatus(e).status; setGitStatus( @@ -323,7 +351,7 @@ export const CodePane = ({ void startGitSync(msg); } } - }, [projectId, refreshGitStatus]); + }, [projectId, refreshGitStatus, refreshLatestCommit]); const softRefreshTree = useCallback(() => { // Keep editor state; only reset the lazy-loaded tree so users see new files. @@ -356,6 +384,7 @@ export const CodePane = ({ await openFile(activePath); } void refreshGitStatus(); + void refreshLatestCommit(); } catch (e: unknown) { const status = asErrorWithStatus(e).status; if (status === 409) { @@ -364,7 +393,7 @@ export const CodePane = ({ } setGitStatus(`Git update failed${status ? ` (HTTP ${status})` : ""}`); } - }, [projectId, softRefreshTree, dirty, activePath, openFile, refreshGitStatus]); + }, [projectId, softRefreshTree, dirty, activePath, openFile, refreshGitStatus, refreshLatestCommit]); useEffect(() => { if (!dirty) return; @@ -654,10 +683,36 @@ export const CodePane = ({ -
- {saveStatus} - {saveStatus && gitStatus ? " \u2022 " : ""} - {gitStatus} +
+
+ {saveStatus} + {saveStatus && gitStatus ? " \u2022 " : ""} + {gitStatus} +
+
+ {latestCommit ? ( + <> + Latest: {latestCommit.author_name || latestCommit.author_email || "unknown"} ·{" "} + {latestCommit.short_sha} + {latestCommit.web_url ? ( + <> + {" "} + + open + + + ) : null} + {" · see Production tab"} + + ) : ( + latestCommitError || "Latest: n/a" + )} +
diff --git a/frontend/src/components/DatabasePane.tsx b/frontend/src/components/DatabasePane.tsx index 7597e0e..2d2f7e6 100644 --- a/frontend/src/components/DatabasePane.tsx +++ b/frontend/src/components/DatabasePane.tsx @@ -15,6 +15,7 @@ import { type NodeProps, } from "@xyflow/react"; import { MoreHorizontal, X } from "lucide-react"; +import { PaneLoading } from "@/components/PaneLoading"; import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { Button } from "@/components/ui/button"; @@ -1585,7 +1586,7 @@ export const DatabasePane = ({ }, [draft, projectId, schemaVersion]); if (loading || !draft) { - return
Loading database workspace...
; + return ; } const renderInspector = () => { diff --git a/frontend/src/components/DesignPane.tsx b/frontend/src/components/DesignPane.tsx index 476a391..16f7689 100644 --- a/frontend/src/components/DesignPane.tsx +++ b/frontend/src/components/DesignPane.tsx @@ -1,6 +1,7 @@ import { Button } from "@/components/ui/button"; import type { DesignState } from "@/types/design"; import { CheckCircle2, Loader2 } from "lucide-react"; +import { PaneLoading } from "@/components/PaneLoading"; type DesignPaneProps = { state: DesignState | null; @@ -50,9 +51,7 @@ export const DesignPane = ({ {loading ? ( -
- This might take a few minutes. -
+ ) : null} {error ? ( diff --git a/frontend/src/components/PaneLoading.tsx b/frontend/src/components/PaneLoading.tsx new file mode 100644 index 0000000..f0ac2ac --- /dev/null +++ b/frontend/src/components/PaneLoading.tsx @@ -0,0 +1,10 @@ +import { Loader2 } from "lucide-react"; + +export function PaneLoading({ message = "Loading..." }: { message?: string }) { + return ( +
+ +
{message}
+
+ ); +} diff --git a/frontend/src/components/ProductionPane.tsx b/frontend/src/components/ProductionPane.tsx index 162e049..0182ccb 100644 --- a/frontend/src/components/ProductionPane.tsx +++ b/frontend/src/components/ProductionPane.tsx @@ -1,7 +1,9 @@ import { useCallback, useEffect, useMemo, useState } from "react"; import { AlertTriangle, CloudOff, Rocket } from "lucide-react"; +import { PaneLoading } from "@/components/PaneLoading"; import { Button } from "@/components/ui/button"; +import { projectGitHistory, type GitHistoryCommit } from "@/services/gitSync"; import { productionCreateRelease, productionDeployRelease, @@ -35,6 +37,9 @@ export const ProductionPane = ({ projectId }: Props) => { const [status, setStatus] = useState(null); const [releases, setReleases] = useState([]); const [selectedReleaseId, setSelectedReleaseId] = useState(""); + const [historyLoading, setHistoryLoading] = useState(false); + const [historyError, setHistoryError] = useState(null); + const [history, setHistory] = useState([]); const refresh = useCallback(async () => { if (!projectId) return; @@ -57,9 +62,25 @@ export const ProductionPane = ({ projectId }: Props) => { } }, [projectId, selectedReleaseId]); + const refreshHistory = useCallback(async () => { + if (!projectId) return; + setHistoryLoading(true); + setHistoryError(null); + try { + const commits = await projectGitHistory(projectId, { limit: 30 }); + setHistory(commits); + } catch (err) { + setHistoryError(apiErrorMessage(err, "Failed to load git history.")); + setHistory([]); + } finally { + setHistoryLoading(false); + } + }, [projectId]); + useEffect(() => { void refresh(); - }, [refresh]); + void refreshHistory(); + }, [refresh, refreshHistory]); const onCreateRelease = useCallback(async () => { setBusy(true); @@ -137,7 +158,7 @@ export const ProductionPane = ({ projectId }: Props) => { ) : null} - {loading ?
Loading production data...
: null} + {loading ? : null}
Live status
@@ -182,6 +203,9 @@ export const ProductionPane = ({ projectId }: Props) => { +
@@ -222,6 +246,42 @@ export const ProductionPane = ({ projectId }: Props) => { ) : null} +
+
Git Activity
+ {historyLoading ? ( +
Loading commit history...
+ ) : historyError ? ( +
+ {historyError} +
+ ) : history.length === 0 ? ( +
No commits yet.
+ ) : ( +
+ {history.map((c) => ( +
+
{c.title || c.short_sha}
+
+ {c.author_name || c.author_email || "unknown"} ·{" "} + {c.authored_date ? new Date(c.authored_date).toLocaleString() : "unknown time"} ·{" "} + {c.short_sha} +
+ {c.web_url ? ( + + Open commit + + ) : null} +
+ ))} +
+ )} +
+ {status?.deployment?.status === "failed" ? (
diff --git a/frontend/src/components/ProjectLockModal.tsx b/frontend/src/components/ProjectLockModal.tsx new file mode 100644 index 0000000..18b6238 --- /dev/null +++ b/frontend/src/components/ProjectLockModal.tsx @@ -0,0 +1,48 @@ +import { Button } from "@/components/ui/button"; + +type Props = { + open: boolean; + lockedByEmail?: string; + lockedAt?: string; + busy?: boolean; + onTakeOver: () => void; + onClose: () => void; +}; + +const formatWhen = (raw?: string): string => { + if (!raw) return "unknown time"; + const d = new Date(raw); + if (Number.isNaN(d.getTime())) return raw; + return d.toLocaleString(); +}; + +export const ProjectLockModal = ({ + open, + lockedByEmail, + lockedAt, + busy = false, + onTakeOver, + onClose, +}: Props) => { + if (!open) return null; + return ( +
+
+
Project is currently locked
+
+ Active editor: {lockedByEmail || "unknown"} +
+ Since: {formatWhen(lockedAt)} +
+
+ + +
+
+
+ ); +}; diff --git a/frontend/src/components/ProjectMembers.tsx b/frontend/src/components/ProjectMembers.tsx new file mode 100644 index 0000000..152e2a7 --- /dev/null +++ b/frontend/src/components/ProjectMembers.tsx @@ -0,0 +1,166 @@ +import { useCallback, useEffect, useMemo, useState } from "react"; + +import { Button } from "@/components/ui/button"; +import { + type ProjectMember, + projectMembersAdd, + projectMembersList, + projectMembersRemoveByEmail, + projectMembersRemoveBySub, +} from "@/services/projectMembers"; + +type Props = { + projectId: string; +}; + +const asErrorText = (err: unknown, fallback: string): string => { + if (err && typeof err === "object") { + const rec = err as { data?: unknown; message?: unknown }; + if (rec.data && typeof rec.data === "object") { + const d = rec.data as { error?: unknown; detail?: unknown }; + if (typeof d.detail === "string" && d.detail.trim()) return d.detail; + if (typeof d.error === "string" && d.error.trim()) return d.error; + } + if (typeof rec.message === "string" && rec.message.trim()) return rec.message; + } + return fallback; +}; + +export const ProjectMembers = ({ projectId }: Props) => { + const [loading, setLoading] = useState(false); + const [busy, setBusy] = useState(false); + const [error, setError] = useState(null); + const [members, setMembers] = useState([]); + const [isOwner, setIsOwner] = useState(false); + const [inviteEmail, setInviteEmail] = useState(""); + + const refresh = useCallback(async () => { + if (!projectId) return; + setLoading(true); + setError(null); + try { + const data = await projectMembersList(projectId); + setMembers(Array.isArray(data.members) ? data.members : []); + setIsOwner(!!data.is_owner); + } catch (err) { + setError(asErrorText(err, "Failed to load members.")); + } finally { + setLoading(false); + } + }, [projectId]); + + useEffect(() => { + void refresh(); + }, [refresh]); + + const onInvite = useCallback(async () => { + const email = inviteEmail.trim().toLowerCase(); + if (!email) return; + setBusy(true); + setError(null); + try { + await projectMembersAdd(projectId, email); + setInviteEmail(""); + await refresh(); + } catch (err) { + setError(asErrorText(err, "Failed to invite member.")); + } finally { + setBusy(false); + } + }, [inviteEmail, projectId, refresh]); + + const onRemove = useCallback( + async (m: ProjectMember) => { + if (!isOwner) return; + setBusy(true); + setError(null); + try { + if (m.user_sub) { + await projectMembersRemoveBySub(projectId, m.user_sub); + } else { + await projectMembersRemoveByEmail(projectId, m.email); + } + await refresh(); + } catch (err) { + setError(asErrorText(err, "Failed to remove member.")); + } finally { + setBusy(false); + } + }, + [isOwner, projectId, refresh] + ); + + const sorted = useMemo(() => { + return [...members].sort((a, b) => { + const ra = a.role === "owner" ? 0 : 1; + const rb = b.role === "owner" ? 0 : 1; + if (ra !== rb) return ra - rb; + return a.email.localeCompare(b.email); + }); + }, [members]); + + return ( +
+
+
+
Shared Access
+
+ {isOwner ? "Manage editors for this project." : "Owner controls membership."} +
+
+ +
+ + {isOwner ? ( +
+ setInviteEmail(e.target.value)} + placeholder="teammate@example.com" + className="h-9 flex-1 rounded-md border border-input bg-background px-3 text-sm" + disabled={busy} + /> + +
+ ) : null} + + {error ?
{error}
: null} + +
+ {loading ? ( +
Loading members...
+ ) : sorted.length === 0 ? ( +
No members.
+ ) : ( + sorted.map((m) => ( +
+
+
{m.email}
+
+ {m.role === "owner" ? "Owner" : m.pending ? "Editor (pending)" : "Editor"} +
+
+ {isOwner && m.role !== "owner" ? ( + + ) : null} +
+ )) + )} +
+
+ ); +}; diff --git a/frontend/src/components/SessionClaimedModal.tsx b/frontend/src/components/SessionClaimedModal.tsx new file mode 100644 index 0000000..e63cc51 --- /dev/null +++ b/frontend/src/components/SessionClaimedModal.tsx @@ -0,0 +1,37 @@ +import { Button } from "@/components/ui/button"; + +type Props = { + open: boolean; + claimedByEmail?: string; + onReconnect: () => void; + onDismiss: () => void; + busy?: boolean; +}; + +export const SessionClaimedModal = ({ + open, + claimedByEmail, + onReconnect, + onDismiss, + busy = false, +}: Props) => { + if (!open) return null; + return ( +
+
+
Session claimed by another editor
+
+ {claimedByEmail ? `${claimedByEmail} took over this workspace.` : "Another user took over this workspace."} +
+
+ + +
+
+
+ ); +}; diff --git a/frontend/src/hooks/useMessageBus.ts b/frontend/src/hooks/useMessageBus.ts index 54cc9af..f720bb2 100644 --- a/frontend/src/hooks/useMessageBus.ts +++ b/frontend/src/hooks/useMessageBus.ts @@ -22,6 +22,7 @@ interface UseMessageBusReturn { isConnected: boolean; error: string | null; connect: () => Promise; + connectWithExtra: (extra: Record) => Promise; disconnect: () => void; send: (type: MessageType, payload: Record) => void; } @@ -126,13 +127,15 @@ export const useMessageBus = ({ }; }, []); - const connect = useCallback(async () => { + const doConnect = useCallback(async (extraInit?: Record) => { if (!messageBusRef.current) { throw new Error("MessageBus not initialized"); } // Don't reconnect after a terminal server rejection (e.g. not_found, auth error). - if (terminalCloseRef.current) { + if (extraInit) { + terminalCloseRef.current = false; + } else if (terminalCloseRef.current) { console.log("Connection terminated by server, not reconnecting"); return; } @@ -186,6 +189,9 @@ export const useMessageBus = ({ permissionMode, thinkingLevel ); + if (extraInit && webSocketRef.current) { + webSocketRef.current.setNextInitPayload(extraInit); + } lastConnectParamsRef.current = nextParams; await webSocketRef.current.connect(); } catch (err) { @@ -197,6 +203,17 @@ export const useMessageBus = ({ } }, [wsUrl, sessionId, permissionMode, thinkingLevel, isConnected]); + const connect = useCallback(async () => { + await doConnect(); + }, [doConnect]); + + const connectWithExtra = useCallback( + async (extra: Record) => { + await doConnect(extra); + }, + [doConnect] + ); + const disconnect = useCallback(() => { if (webSocketRef.current) { webSocketRef.current.disconnect(); @@ -254,6 +271,7 @@ export const useMessageBus = ({ isConnected, error, connect, + connectWithExtra, disconnect, send, }; diff --git a/frontend/src/screens/Create/index.tsx b/frontend/src/screens/Create/index.tsx index 84f5e2f..809017d 100644 --- a/frontend/src/screens/Create/index.tsx +++ b/frontend/src/screens/Create/index.tsx @@ -9,6 +9,7 @@ import { PhoneIcon, Play, RotateCcw, + Share2, TabletIcon, X, } from "lucide-react"; @@ -41,7 +42,11 @@ import { Button } from "@/components/ui/button"; import { CodePane } from "@/components/CodePane"; import { DatabasePane } from "@/components/DatabasePane"; import { DesignPane } from "@/components/DesignPane"; +import { PaneLoading } from "@/components/PaneLoading"; +import { ProjectLockModal } from "@/components/ProjectLockModal"; +import { ProjectMembers } from "@/components/ProjectMembers"; import { ProductionPane } from "@/components/ProductionPane"; +import { SessionClaimedModal } from "@/components/SessionClaimedModal"; import { designCaptureSnapshot, designCreateApproaches, @@ -282,7 +287,20 @@ const Create = () => { project_id: string; name: string; slug: string; + owner_email?: string; + is_owner?: boolean; } | null>(null); + const [showMembersPanel, setShowMembersPanel] = useState(false); + const [lockModal, setLockModal] = useState<{ + open: boolean; + lockedByEmail?: string; + lockedAt?: string; + }>({ open: false }); + const [claimedModal, setClaimedModal] = useState<{ + open: boolean; + claimedByEmail?: string; + }>({ open: false }); + const [takeoverBusy, setTakeoverBusy] = useState(false); const redirectedFromLegacy = useRef(false); const initialPromptSent = useRef(false); const [selectedDevice, setSelectedDevice] = useState< @@ -1113,6 +1131,9 @@ const Create = () => { return [...prev, entry]; }); setInitCompleted(true); + setLockModal({ open: false }); + setClaimedModal({ open: false }); + setTakeoverBusy(false); // If the server reports a pending HITL request (e.g., reconnect), surface it. const hp = message.data.hitl_pending; @@ -1132,11 +1153,30 @@ const Create = () => { project_id: proj.project_id, name: proj.name, slug: proj.slug, + owner_email: typeof proj.owner_email === "string" ? proj.owner_email : undefined, }); } }, [MessageType.ERROR]: (message: Message) => { + const code = + typeof message.data.code === "string" + ? message.data.code + : typeof message.data.error === "string" + ? message.data.error + : ""; + const lockedBy = asObj(message.data.locked_by); + if (code === "project_locked") { + setLockModal({ + open: true, + lockedByEmail: + typeof lockedBy?.email === "string" ? lockedBy.email : undefined, + lockedAt: typeof lockedBy?.at === "string" ? lockedBy.at : undefined, + }); + setTakeoverBusy(false); + return; + } + const errorObj = asObj(message.data.error); const detail = typeof message.data.detail === "string" @@ -1161,6 +1201,15 @@ const Create = () => { ]); }, + [MessageType.SESSION_CLAIMED]: (message: Message) => { + const by = asObj(message.data.claimed_by); + setClaimedModal({ + open: true, + claimedByEmail: + typeof by?.email === "string" ? by.email : undefined, + }); + }, + [MessageType.AGENT_PARTIAL]: (message: Message) => { const text = message.data.text; const id = message.id; @@ -1749,7 +1798,7 @@ const Create = () => { return combined; }, [messages, toolRunsByAssistantMsgId, reasoningByAssistantMsgId, resolvedSessionId]); - const { isConnecting, isConnected, error, connect, send } = useMessageBus({ + const { isConnecting, isConnected, error, connect, connectWithExtra, disconnect, send } = useMessageBus({ wsUrl: AGENT_CONFIG.WS_URL, sessionId: resolvedSessionId || undefined, permissionMode: AGENT_CONFIG.PERMISSION_MODE, @@ -1784,6 +1833,20 @@ const Create = () => { sendRef.current = send; }, [isConnected, send]); + const reconnectWithForceClaim = useCallback(async () => { + setTakeoverBusy(true); + try { + disconnect(); + await connectWithExtra({ force_claim: true }); + setLockModal({ open: false }); + setClaimedModal({ open: false }); + } catch (err) { + console.error("Forced reconnect failed:", err); + } finally { + setTakeoverBusy(false); + } + }, [connectWithExtra, disconnect]); + useEffect(() => { const active = !!resolvedSessionId && isConnected; window.dispatchEvent( @@ -2961,7 +3024,19 @@ const Create = () => {
{renderMainViewToggle()} {renderDeviceControls()} - +
+ + +
@@ -2975,7 +3050,7 @@ const Create = () => { onShowSystemFilesChange={setShowSystemFiles} /> ) : ( -
Loading project...
+ ) ) : mainView === "database" ? ( resolvedSessionId ? ( @@ -2985,7 +3060,7 @@ const Create = () => { onChatReply={pushAssistantMessage} /> ) : ( -
Loading project...
+ ) ) : mainView === "design" ? ( resolvedSessionId ? ( @@ -3003,13 +3078,13 @@ const Create = () => { onStop={handleDesignStop} /> ) : ( -
Loading project...
+ ) ) : mainView === "production" ? ( resolvedSessionId ? ( ) : ( -
Loading project...
+ ) ) : (
@@ -3300,6 +3375,10 @@ const Create = () => {
+ {showMembersPanel && resolvedSessionId ? ( + + ) : null} +
{
)} + + { + void reconnectWithForceClaim(); + }} + onClose={() => setLockModal({ open: false })} + /> + + { + void reconnectWithForceClaim(); + }} + onDismiss={() => setClaimedModal({ open: false })} + />
); }; diff --git a/frontend/src/screens/New/index.tsx b/frontend/src/screens/New/index.tsx index e196919..0776a38 100644 --- a/frontend/src/screens/New/index.tsx +++ b/frontend/src/screens/New/index.tsx @@ -14,6 +14,8 @@ type Project = { name: string; slug: string; template_id?: string | null; + owner_email?: string | null; + is_owner?: boolean; created_at?: string | null; updated_at?: string | null; }; @@ -309,6 +311,7 @@ const NewScreen: React.FC = () => { > {projects.map((p) => { const isDeleting = deleting.has(p.project_id); + const isOwner = !!p.is_owner; return (
{ >
{p.name}
/{p.slug}
+
+ {isOwner ? ( + Owner + ) : ( + + Shared editor + {p.owner_email ? ` · Owner: ${p.owner_email}` : ""} + + )} +
Template: {templateLabel(p.template_id)}
@@ -329,14 +342,16 @@ const NewScreen: React.FC = () => { diff --git a/frontend/src/services/gitSync.ts b/frontend/src/services/gitSync.ts index 1ccfeb2..06afe93 100644 --- a/frontend/src/services/gitSync.ts +++ b/frontend/src/services/gitSync.ts @@ -38,6 +38,17 @@ export type GitPullResponse = { detail?: string; }; +export type GitHistoryCommit = { + sha: string; + short_sha: string; + title: string; + message: string; + author_name: string; + author_email: string; + authored_date: string; + web_url: string; +}; + export const projectGitSync = async ( projectId: string, args?: { commit_message?: string } @@ -83,3 +94,26 @@ export const projectGitPull = async ( throw Object.assign(new Error("git_pull_failed"), { status: res.status, data }); return data as GitPullResponse; }; + +export const projectGitHistory = async ( + projectId: string, + args?: { limit?: number; ref?: string } +): Promise => { + const url = new URL( + `/api/projects/${encodeURIComponent(projectId)}/git/history`, + AGENT_CONFIG.HTTP_URL + ); + if (typeof args?.limit === "number" && Number.isFinite(args.limit)) { + url.searchParams.set("limit", String(Math.max(1, Math.floor(args.limit)))); + } + if (typeof args?.ref === "string" && args.ref.trim()) { + url.searchParams.set("ref", args.ref.trim()); + } + const res = await fetch(url.toString(), { credentials: "include" }); + const data = await readJson(res); + if (!res.ok) { + throw Object.assign(new Error("git_history_failed"), { status: res.status, data }); + } + const commits = (data as { commits?: GitHistoryCommit[] })?.commits; + return Array.isArray(commits) ? commits : []; +}; diff --git a/frontend/src/services/projectMembers.ts b/frontend/src/services/projectMembers.ts new file mode 100644 index 0000000..158c95d --- /dev/null +++ b/frontend/src/services/projectMembers.ts @@ -0,0 +1,102 @@ +import { AGENT_CONFIG } from "@/config/agent"; + +const agentUrl = (path: string) => new URL(path, AGENT_CONFIG.HTTP_URL).toString(); + +const readJson = async (res: Response): Promise => { + try { + return (await res.json()) as unknown; + } catch { + return null; + } +}; + +export type ProjectMember = { + user_sub: string | null; + email: string; + added_at: string | null; + added_by_sub: string | null; + role: "owner" | "editor"; + pending: boolean; +}; + +export type ProjectMembersResponse = { + project_id: string; + is_owner: boolean; + members: ProjectMember[]; +}; + +export const projectMembersList = async ( + projectId: string +): Promise => { + const url = agentUrl(`/api/projects/${encodeURIComponent(projectId)}/members`); + const res = await fetch(url, { credentials: "include" }); + const data = await readJson(res); + if (!res.ok) { + throw Object.assign(new Error("members_list_failed"), { + status: res.status, + data, + }); + } + return data as ProjectMembersResponse; +}; + +export const projectMembersAdd = async ( + projectId: string, + email: string +): Promise => { + const url = agentUrl(`/api/projects/${encodeURIComponent(projectId)}/members`); + const res = await fetch(url, { + method: "POST", + credentials: "include", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ email }), + }); + const data = await readJson(res); + if (!res.ok) { + throw Object.assign(new Error("members_add_failed"), { + status: res.status, + data, + }); + } + return ((data as { member?: ProjectMember }).member || null) as ProjectMember; +}; + +export const projectMembersRemoveBySub = async ( + projectId: string, + userSub: string +): Promise => { + const url = agentUrl( + `/api/projects/${encodeURIComponent(projectId)}/members/${encodeURIComponent(userSub)}` + ); + const res = await fetch(url, { + method: "DELETE", + credentials: "include", + }); + const data = await readJson(res); + if (!res.ok) { + throw Object.assign(new Error("members_remove_failed"), { + status: res.status, + data, + }); + } +}; + +export const projectMembersRemoveByEmail = async ( + projectId: string, + email: string +): Promise => { + const url = agentUrl( + `/api/projects/${encodeURIComponent(projectId)}/members/by-email/${encodeURIComponent(email)}` + ); + const res = await fetch(url, { + method: "DELETE", + credentials: "include", + }); + const data = await readJson(res); + if (!res.ok) { + throw Object.assign(new Error("members_remove_failed"), { + status: res.status, + data, + }); + } +}; diff --git a/frontend/src/services/websocketBus.ts b/frontend/src/services/websocketBus.ts index 2053cf9..b49c335 100644 --- a/frontend/src/services/websocketBus.ts +++ b/frontend/src/services/websocketBus.ts @@ -32,6 +32,7 @@ export class WebSocketBus { private reconnectDelay = 1000; private connectTimeoutMs = 10_000; private _terminalClose = false; + private nextInitExtra: Record | null = null; constructor(config: WebSocketBusConfig) { this.config = config; @@ -104,6 +105,10 @@ export class WebSocketBus { if (this.config.thinkingLevel) { initData.thinking_level = this.config.thinkingLevel; } + if (this.nextInitExtra) { + Object.assign(initData, this.nextInitExtra); + this.nextInitExtra = null; + } console.log("Sending INIT message with session_id:", this.config.sessionId); this.sendMessage(createMessage(MessageType.INIT, initData)); @@ -263,6 +268,10 @@ export class WebSocketBus { return this.isReady && this.ws?.readyState === WebSocket.OPEN; } + public setNextInitPayload(extra?: Record): void { + this.nextInitExtra = extra ? { ...extra } : null; + } + /** True if the server rejected the session with a terminal close code (e.g. not_found). */ public get terminalClose(): boolean { return this._terminalClose; diff --git a/frontend/src/types/messages.ts b/frontend/src/types/messages.ts index 7a90f35..87b2c04 100644 --- a/frontend/src/types/messages.ts +++ b/frontend/src/types/messages.ts @@ -13,6 +13,7 @@ export enum MessageType { HITL_REQUEST = "hitl_request", HITL_RESPONSE = "hitl_response", RUNTIME_ERROR = "runtime_error", + SESSION_CLAIMED = "session_claimed", } export enum Sender { @@ -89,6 +90,16 @@ export type MessageData = JsonObject & { exists?: boolean; permission_mode?: PermissionMode; thinking_level?: ThinkingLevel; + force_claim?: boolean; + code?: string; + locked_by?: { + email?: string; + at?: string; + }; + claimed_by?: { + email?: string; + name?: string; + }; hitl_pending?: { interrupt_id: string; request: HitlRequest }; interrupt_id?: string; diff --git a/src/agent_core.py b/src/agent_core.py index fb5b387..3aa85b5 100644 --- a/src/agent_core.py +++ b/src/agent_core.py @@ -359,6 +359,7 @@ class MessageType(Enum): HITL_REQUEST = "hitl_request" HITL_RESPONSE = "hitl_response" RUNTIME_ERROR = "runtime_error" + SESSION_CLAIMED = "session_claimed" ERROR = "error" PING = "ping" @@ -689,6 +690,27 @@ def _thinking_level_for_session(self, session_id: str) -> ThinkingLevel: return normalize_thinking_level(level) return "none" + def _git_author_for_session(self, session_id: str) -> tuple[str | None, str | None]: + init_data = self.session_data.get(session_id) + if not isinstance(init_data, dict): + return None, None + actor = init_data.get("actor") + if not isinstance(actor, dict): + return None, None + name_raw = actor.get("name") + email_raw = actor.get("email") + name = ( + str(name_raw).strip() + if isinstance(name_raw, str) and str(name_raw).strip() + else None + ) + email = ( + str(email_raw).strip() + if isinstance(email_raw, str) and str(email_raw).strip() + else None + ) + return name, email + def _conversation_history(self, session_id: str) -> list[dict[str, str]]: init_data = self.session_data.get(session_id) if not isinstance(init_data, dict): @@ -1183,6 +1205,7 @@ async def _ensure_app_environment( # noqa: C901 "_last_qa_failure", "permission_mode", "thinking_level", + "actor", "viewport_width", "viewport_height", ): @@ -1886,6 +1909,7 @@ async def send_feedback( # noqa: C901 init_data = self.session_data.get(session_id) or {} proj = init_data.get("project") if isinstance(init_data, dict) else None git = init_data.get("git") if isinstance(init_data, dict) else None + actor = init_data.get("actor") if isinstance(init_data, dict) else None if isinstance(proj, dict): slug = proj.get("slug") @@ -1922,6 +1946,13 @@ async def send_feedback( # noqa: C901 config["configurable"]["git_path_with_namespace"] = pwn if isinstance(web, str) and web: config["configurable"]["git_web_url"] = web + if isinstance(actor, dict): + actor_name = actor.get("name") + actor_email = actor.get("email") + if isinstance(actor_name, str) and actor_name.strip(): + config["configurable"]["git_author_name"] = actor_name.strip() + if isinstance(actor_email, str) and actor_email.strip(): + config["configurable"]["git_author_email"] = actor_email.strip() except Exception: logger.debug( "Failed to populate project/git metadata for session %s", @@ -2544,6 +2575,9 @@ def _is_ai_message(msg: Any) -> bool: assert self._session_manager is not None backend = self._session_manager.get_backend(session_id) policy_warnings: list[str] = [] + author_name, author_email = self._git_author_for_session( + session_id + ) def _msg(diff_stat: str, name_status: str) -> str: agent_summary = ( @@ -2571,6 +2605,8 @@ def _msg(diff_stat: str, name_status: str) -> str: repo_http_url=repo_http_url, project_slug=str(project_slug), commit_message_fn=_msg, + author_name=author_name, + author_email=author_email, ) if policy_warnings: yield Message.new( @@ -2817,6 +2853,7 @@ async def resume_hitl( # noqa: C901 init_data = self.session_data.get(session_id) or {} proj = init_data.get("project") if isinstance(init_data, dict) else None git = init_data.get("git") if isinstance(init_data, dict) else None + actor = init_data.get("actor") if isinstance(init_data, dict) else None if isinstance(proj, dict): slug = proj.get("slug") @@ -2845,6 +2882,13 @@ async def resume_hitl( # noqa: C901 config["configurable"]["git_path_with_namespace"] = pwn if isinstance(web, str) and web: config["configurable"]["git_web_url"] = web + if isinstance(actor, dict): + actor_name = actor.get("name") + actor_email = actor.get("email") + if isinstance(actor_name, str) and actor_name.strip(): + config["configurable"]["git_author_name"] = actor_name.strip() + if isinstance(actor_email, str) and actor_email.strip(): + config["configurable"]["git_author_email"] = actor_email.strip() except Exception: logger.debug( "Failed to populate project/git metadata for design session %s", @@ -3307,6 +3351,9 @@ def _is_ai_message(msg: Any) -> bool: assert self._session_manager is not None backend = self._session_manager.get_backend(session_id) policy_warnings: list[str] = [] + author_name, author_email = self._git_author_for_session( + session_id + ) def _msg(diff_stat: str, name_status: str) -> str: agent_summary = ( @@ -3334,6 +3381,8 @@ def _msg(diff_stat: str, name_status: str) -> str: repo_http_url=repo_http_url, project_slug=str(project_slug), commit_message_fn=_msg, + author_name=author_name, + author_email=author_email, ) if policy_warnings: yield Message.new( diff --git a/src/deepagents_backend/controller_graph.py b/src/deepagents_backend/controller_graph.py index 795faf1..cb0d7bd 100644 --- a/src/deepagents_backend/controller_graph.py +++ b/src/deepagents_backend/controller_graph.py @@ -389,6 +389,8 @@ async def git_sync(_state: ControllerState, config: Any) -> dict[str, Any]: # n repo_http_url = cfg.get("git_repo_http_url") project_slug = cfg.get("project_slug") or thread_id + author_name = cfg.get("git_author_name") + author_email = cfg.get("git_author_email") if not isinstance(repo_http_url, str) or not repo_http_url: if required: @@ -471,6 +473,12 @@ def _commit_message(diff_stat: str, name_status: str) -> str: repo_http_url=repo_http_url, project_slug=str(project_slug), commit_message_fn=_commit_message, + author_name=str(author_name).strip() + if isinstance(author_name, str) and author_name.strip() + else None, + author_email=str(author_email).strip() + if isinstance(author_email, str) and author_email.strip() + else None, ) return { "git_pushed": bool(pushed), diff --git a/src/gitlab/client.py b/src/gitlab/client.py index 14e0b97..9798bdf 100644 --- a/src/gitlab/client.py +++ b/src/gitlab/client.py @@ -31,6 +31,18 @@ class GitLabProject: http_url_to_repo: str +@dataclass(frozen=True) +class GitLabCommit: + sha: str + short_sha: str + title: str + message: str + author_name: str + author_email: str + authored_date: str + web_url: str + + class GitLabClient: def __init__( self, *, base_url: str, token: str, session: requests.Session | None = None @@ -146,6 +158,45 @@ def delete_project(self, project_id: int) -> bool: data = self._request("DELETE", f"/projects/{int(project_id)}") return data is not None + def list_project_commits( + self, + project_id: int, + *, + ref_name: str | None = None, + limit: int = 30, + ) -> list[GitLabCommit]: + per_page = max(1, min(100, int(limit))) + params: dict[str, Any] = {"per_page": per_page} + if isinstance(ref_name, str) and ref_name.strip(): + params["ref_name"] = ref_name.strip() + data = self._request( + "GET", + f"/projects/{int(project_id)}/repository/commits", + params=params, + ) + if not isinstance(data, list): + raise GitLabError("Unexpected GitLab commits response", payload=data) + out: list[GitLabCommit] = [] + for item in data: + if not isinstance(item, dict): + continue + sha = str(item.get("id") or "") + if not sha: + continue + out.append( + GitLabCommit( + sha=sha, + short_sha=str(item.get("short_id") or sha[:8]), + title=str(item.get("title") or ""), + message=str(item.get("message") or ""), + author_name=str(item.get("author_name") or ""), + author_email=str(item.get("author_email") or ""), + authored_date=str(item.get("authored_date") or ""), + web_url=str(item.get("web_url") or ""), + ) + ) + return out + @staticmethod def _parse_project(data: Any) -> GitLabProject: if not isinstance(data, dict): diff --git a/src/gitlab/sync.py b/src/gitlab/sync.py index 7226355..7344da6 100644 --- a/src/gitlab/sync.py +++ b/src/gitlab/sync.py @@ -870,6 +870,8 @@ def sync_sandbox_tree_to_repo( # noqa: C901 project_slug: str, commit_message: str | None = None, commit_message_fn: Any | None = None, + author_name: str | None = None, + author_email: str | None = None, branch: str | None = None, cache_dir: str | None = None, excludes: list[str] | None = None, @@ -1011,14 +1013,24 @@ def _flush() -> None: _remove_excluded_paths(repo_dir, excludes=ex) # Configure author. + eff_author_name = ( + str(author_name).strip() + if isinstance(author_name, str) and author_name.strip() + else git_commit_author_name() + ) + eff_author_email = ( + str(author_email).strip() + if isinstance(author_email, str) and author_email.strip() + else git_commit_author_email() + ) r.run( - ["git", "config", "user.name", git_commit_author_name()], + ["git", "config", "user.name", eff_author_name], cwd=repo_dir, env=env, check=True, ) r.run( - ["git", "config", "user.email", git_commit_author_email()], + ["git", "config", "user.email", eff_author_email], cwd=repo_dir, env=env, check=True, diff --git a/src/projects/store.py b/src/projects/store.py index d22b778..87be1f7 100644 --- a/src/projects/store.py +++ b/src/projects/store.py @@ -108,6 +108,29 @@ def ensure_projects_schema(client: HasuraClient) -> None: ADD COLUMN IF NOT EXISTS gitlab_path text NULL; ALTER TABLE amicable_meta.projects ADD COLUMN IF NOT EXISTS gitlab_web_url text NULL; + ALTER TABLE amicable_meta.projects + ADD COLUMN IF NOT EXISTS locked_by_sub text NULL; + ALTER TABLE amicable_meta.projects + ADD COLUMN IF NOT EXISTS locked_by_email text NULL; + ALTER TABLE amicable_meta.projects + ADD COLUMN IF NOT EXISTS locked_at timestamptz NULL; + CREATE TABLE IF NOT EXISTS amicable_meta.project_members ( + project_id text NOT NULL REFERENCES amicable_meta.projects(project_id) ON DELETE CASCADE, + user_sub text NULL, + user_email text NOT NULL, + added_at timestamptz NOT NULL DEFAULT now(), + added_by_sub text NULL, + PRIMARY KEY (project_id, user_email) + ); + CREATE INDEX IF NOT EXISTS idx_project_members_user + ON amicable_meta.project_members(user_sub); + CREATE INDEX IF NOT EXISTS idx_project_members_email + ON amicable_meta.project_members(user_email); + INSERT INTO amicable_meta.project_members (project_id, user_sub, user_email, added_at) + SELECT p.project_id, p.owner_sub, lower(p.owner_email), p.created_at + FROM amicable_meta.projects p + WHERE p.deleted_at IS NULL + ON CONFLICT DO NOTHING; CREATE TABLE IF NOT EXISTS amicable_meta.app_releases ( release_id text PRIMARY KEY, project_id text NOT NULL REFERENCES amicable_meta.projects(project_id) ON DELETE CASCADE, @@ -183,6 +206,23 @@ class Project: updated_at: str | None = None +@dataclass(frozen=True) +class ProjectMember: + project_id: str + user_sub: str | None + user_email: str + added_at: str | None = None + added_by_sub: str | None = None + + +@dataclass(frozen=True) +class ProjectLock: + project_id: str + locked_by_sub: str + locked_by_email: str + locked_at: str + + def _tuples_to_dicts(res: dict[str, Any]) -> list[dict[str, Any]]: rows = res.get("result") if not isinstance(rows, list) or len(rows) < 2: @@ -278,6 +318,249 @@ def _project_row_by_id_including_deleted( return rows[0] if rows else None +def is_project_owner(client: HasuraClient, *, project_id: str, user_sub: str) -> bool: + p = _get_project_by_id_any_owner(client, project_id=project_id) + return bool(p and p.owner_sub == user_sub) + + +def is_project_member( + client: HasuraClient, *, project_id: str, user_sub: str, user_email: str +) -> bool: + ensure_projects_schema(client) + email = (user_email or "").strip().lower() + res = client.run_sql( + f""" + SELECT 1 + FROM amicable_meta.project_members + WHERE project_id = {_sql_str(project_id)} + AND (user_sub = {_sql_str(user_sub)} OR user_email = {_sql_str(email)}) + LIMIT 1; + """.strip(), # nosec B608 + read_only=True, + ) + return bool(_tuples_to_dicts(res)) + + +def _member_from_row(r: dict[str, Any]) -> ProjectMember: + return ProjectMember( + project_id=str(r["project_id"]), + user_sub=str(r["user_sub"]) if r.get("user_sub") is not None else None, + user_email=str(r["user_email"]), + added_at=str(r.get("added_at")) if r.get("added_at") is not None else None, + added_by_sub=str(r.get("added_by_sub")) + if r.get("added_by_sub") is not None + else None, + ) + + +def _get_member_by_email( + client: HasuraClient, *, project_id: str, user_email: str +) -> ProjectMember | None: + ensure_projects_schema(client) + email = (user_email or "").strip().lower() + res = client.run_sql( + f""" + SELECT project_id, user_sub, user_email, added_at, added_by_sub + FROM amicable_meta.project_members + WHERE project_id = {_sql_str(project_id)} + AND user_email = {_sql_str(email)} + LIMIT 1; + """.strip(), # nosec B608 + read_only=True, + ) + rows = _tuples_to_dicts(res) + return _member_from_row(rows[0]) if rows else None + + +def add_project_member( + client: HasuraClient, + *, + project_id: str, + user_email: str, + user_sub: str | None = None, + added_by_sub: str | None = None, +) -> ProjectMember: + ensure_projects_schema(client) + email = (user_email or "").strip().lower() + if not email: + raise ValueError("missing_user_email") + + sub_sql = _sql_str(user_sub) if user_sub is not None else "NULL" + added_by_sql = _sql_str(added_by_sub) if added_by_sub is not None else "NULL" + client.run_sql( + f""" + INSERT INTO amicable_meta.project_members + (project_id, user_sub, user_email, added_by_sub) + VALUES + ({_sql_str(project_id)}, {sub_sql}, {_sql_str(email)}, {added_by_sql}) + ON CONFLICT (project_id, user_email) + DO UPDATE SET + user_sub = COALESCE(EXCLUDED.user_sub, amicable_meta.project_members.user_sub); + """.strip() # nosec B608 + ) + member = _get_member_by_email(client, project_id=project_id, user_email=email) + if member is None: + raise RuntimeError("failed to add project member") + return member + + +def list_project_members( + client: HasuraClient, *, project_id: str +) -> list[ProjectMember]: + ensure_projects_schema(client) + res = client.run_sql( + f""" + SELECT project_id, user_sub, user_email, added_at, added_by_sub + FROM amicable_meta.project_members + WHERE project_id = {_sql_str(project_id)} + ORDER BY added_at ASC; + """.strip(), # nosec B608 + read_only=True, + ) + return [_member_from_row(r) for r in _tuples_to_dicts(res)] + + +def remove_project_member( + client: HasuraClient, *, project_id: str, user_sub: str +) -> bool: + ensure_projects_schema(client) + project = _get_project_by_id_any_owner(client, project_id=project_id) + if project and project.owner_sub == user_sub: + return False + members = list_project_members(client, project_id=project_id) + if len(members) <= 1: + return False + if not any(m.user_sub == user_sub for m in members): + return False + client.run_sql( + f""" + DELETE FROM amicable_meta.project_members + WHERE project_id = {_sql_str(project_id)} + AND user_sub = {_sql_str(user_sub)}; + """.strip() # nosec B608 + ) + return True + + +def remove_project_member_by_email( + client: HasuraClient, *, project_id: str, user_email: str +) -> bool: + ensure_projects_schema(client) + email = (user_email or "").strip().lower() + project = _get_project_by_id_any_owner(client, project_id=project_id) + if project and (project.owner_email or "").strip().lower() == email: + return False + members = list_project_members(client, project_id=project_id) + if len(members) <= 1: + return False + if not any(m.user_email == email for m in members): + return False + client.run_sql( + f""" + DELETE FROM amicable_meta.project_members + WHERE project_id = {_sql_str(project_id)} + AND user_email = {_sql_str(email)}; + """.strip() # nosec B608 + ) + return True + + +def get_project_lock(client: HasuraClient, *, project_id: str) -> ProjectLock | None: + ensure_projects_schema(client) + res = client.run_sql( + f""" + SELECT locked_by_sub, locked_by_email, locked_at + FROM amicable_meta.projects + WHERE project_id = {_sql_str(project_id)} + AND deleted_at IS NULL + AND locked_by_sub IS NOT NULL + LIMIT 1; + """.strip(), # nosec B608 + read_only=True, + ) + rows = _tuples_to_dicts(res) + if not rows: + return None + r = rows[0] + if r.get("locked_by_sub") is None: + return None + return ProjectLock( + project_id=project_id, + locked_by_sub=str(r["locked_by_sub"]), + locked_by_email=str(r.get("locked_by_email") or ""), + locked_at=str(r.get("locked_at") or ""), + ) + + +def acquire_project_lock( + client: HasuraClient, + *, + project_id: str, + user_sub: str, + user_email: str, + force: bool = False, +) -> ProjectLock | None: + ensure_projects_schema(client) + normalized_email = (user_email or "").strip().lower() + if force: + client.run_sql( + f""" + UPDATE amicable_meta.projects + SET locked_by_sub = {_sql_str(user_sub)}, + locked_by_email = {_sql_str(normalized_email)}, + locked_at = now(), + updated_at = now() + WHERE project_id = {_sql_str(project_id)} + AND deleted_at IS NULL; + """.strip() # nosec B608 + ) + else: + client.run_sql( + f""" + UPDATE amicable_meta.projects + SET locked_by_sub = {_sql_str(user_sub)}, + locked_by_email = {_sql_str(normalized_email)}, + locked_at = now(), + updated_at = now() + WHERE project_id = {_sql_str(project_id)} + AND deleted_at IS NULL + AND (locked_by_sub IS NULL OR locked_by_sub = {_sql_str(user_sub)}); + """.strip() # nosec B608 + ) + + lock = get_project_lock(client, project_id=project_id) + if lock and lock.locked_by_sub == user_sub: + return lock + return None + + +def release_project_lock( + client: HasuraClient, *, project_id: str, user_sub: str +) -> None: + ensure_projects_schema(client) + client.run_sql( + f""" + UPDATE amicable_meta.projects + SET locked_by_sub = NULL, + locked_by_email = NULL, + locked_at = NULL, + updated_at = now() + WHERE project_id = {_sql_str(project_id)} + AND locked_by_sub = {_sql_str(user_sub)} + AND deleted_at IS NULL; + """.strip() # nosec B608 + ) + + +def is_project_lock_holder( + client: HasuraClient, *, project_id: str, user_sub: str +) -> bool: + lock = get_project_lock(client, project_id=project_id) + if lock is None: + return True + return lock.locked_by_sub == user_sub + + def get_project_any_owner(client: HasuraClient, *, project_id: str) -> Project | None: """Return project metadata without enforcing ownership. @@ -318,7 +601,9 @@ def get_project_by_id( p = _get_project_by_id_any_owner(client, project_id=project_id) if not p: return None - if p.owner_sub != owner.sub: + if not is_project_member( + client, project_id=project_id, user_sub=owner.sub, user_email=owner.email + ): return None return p @@ -341,10 +626,12 @@ def get_project_by_slug( rows = _tuples_to_dicts(res) if not rows: return None - r = rows[0] - if str(r.get("owner_sub")) != owner.sub: + p = _project_from_row(rows[0]) + if not is_project_member( + client, project_id=p.project_id, user_sub=owner.sub, user_email=owner.email + ): return None - return _project_from_row(r) + return p def get_project_by_slug_any_owner(client: HasuraClient, *, slug: str) -> Project | None: @@ -388,12 +675,19 @@ def list_projects(client: HasuraClient, *, owner: ProjectOwner) -> list[Project] ensure_projects_schema(client) res = client.run_sql( f""" - SELECT project_id, owner_sub, owner_email, name, slug, project_prompt, sandbox_id, template_id, - gitlab_project_id, gitlab_path, gitlab_web_url, - created_at, updated_at - FROM amicable_meta.projects - WHERE owner_sub = {_sql_str(owner.sub)} AND deleted_at IS NULL - ORDER BY updated_at DESC; + SELECT p.project_id, p.owner_sub, p.owner_email, p.name, p.slug, p.project_prompt, + p.sandbox_id, p.template_id, p.gitlab_project_id, p.gitlab_path, + p.gitlab_web_url, p.created_at, p.updated_at + FROM amicable_meta.projects p + WHERE p.deleted_at IS NULL + AND EXISTS ( + SELECT 1 + FROM amicable_meta.project_members pm + WHERE pm.project_id = p.project_id + AND (pm.user_sub = {_sql_str(owner.sub)} + OR pm.user_email = {_sql_str(owner.email.lower())}) + ) + ORDER BY p.updated_at DESC; """.strip(), # nosec B608 read_only=True, ) @@ -411,6 +705,9 @@ def set_project_slug( new_slug: str, ) -> Project: ensure_projects_schema(client) + p_any = _get_project_by_id_any_owner(client, project_id=project_id) + if not p_any or p_any.owner_sub != owner.sub: + raise PermissionError("project not found") client.run_sql( f""" UPDATE amicable_meta.projects @@ -434,6 +731,10 @@ def set_gitlab_metadata( gitlab_web_url: str | None, ) -> Project: ensure_projects_schema(client) + if not is_project_member( + client, project_id=project_id, user_sub=owner.sub, user_email=owner.email + ): + raise PermissionError("project not found") client.run_sql( f""" UPDATE amicable_meta.projects @@ -441,7 +742,7 @@ def set_gitlab_metadata( gitlab_path = {_sql_str(gitlab_path) if gitlab_path is not None else "NULL"}, gitlab_web_url = {_sql_str(gitlab_web_url) if gitlab_web_url is not None else "NULL"}, updated_at = now() - WHERE project_id = {_sql_str(project_id)} AND owner_sub = {_sql_str(owner.sub)} AND deleted_at IS NULL; + WHERE project_id = {_sql_str(project_id)} AND deleted_at IS NULL; """.strip() # nosec B608 ) p = get_project_by_id(client, owner=owner, project_id=project_id) @@ -501,6 +802,12 @@ def create_project( ) p = _get_project_by_id_any_owner(client, project_id=project_id) if p and p.owner_sub == owner.sub: + add_project_member( + client, + project_id=project_id, + user_sub=owner.sub, + user_email=owner.email, + ) return p raise RuntimeError("failed to allocate unique project slug") @@ -513,9 +820,19 @@ def ensure_project_for_id( ensure_projects_schema(client) existing = _get_project_by_id_any_owner(client, project_id=project_id) if existing: - if existing.owner_sub != owner.sub: - raise PermissionError("project belongs to a different user") - return existing + if is_project_member( + client, project_id=project_id, user_sub=owner.sub, user_email=owner.email + ): + return existing + if existing.owner_sub == owner.sub: + add_project_member( + client, + project_id=project_id, + user_sub=owner.sub, + user_email=owner.email, + ) + return existing + raise PermissionError("project belongs to a different user") # If a project row exists but was soft-deleted, resurrect it instead of failing # with a misleading "failed to auto-create project" error. @@ -534,6 +851,12 @@ def ensure_project_for_id( ) revived = _get_project_by_id_any_owner(client, project_id=project_id) if revived: + add_project_member( + client, + project_id=project_id, + user_sub=owner.sub, + user_email=owner.email, + ) return revived short = project_id.replace("-", "")[:8] @@ -555,6 +878,12 @@ def ensure_project_for_id( ) created = _get_project_by_id_any_owner(client, project_id=project_id) if created and created.owner_sub == owner.sub: + add_project_member( + client, + project_id=project_id, + user_sub=owner.sub, + user_email=owner.email, + ) return created raise RuntimeError("failed to auto-create project") @@ -566,9 +895,8 @@ def rename_project( ensure_projects_schema(client) base = slugify(new_name) - # Ensure project exists and is owned. - existing = get_project_by_id(client, owner=owner, project_id=project_id) - if not existing: + existing = _get_project_by_id_any_owner(client, project_id=project_id) + if not existing or existing.owner_sub != owner.sub: raise PermissionError("project not found") for attempt in range(20): @@ -596,6 +924,9 @@ def mark_project_deleted( client: HasuraClient, *, owner: ProjectOwner, project_id: str ) -> None: ensure_projects_schema(client) + existing = _get_project_by_id_any_owner(client, project_id=project_id) + if not existing or existing.owner_sub != owner.sub: + raise PermissionError("project not found") # Mark deleted first so it disappears from lists immediately. client.run_sql( f""" @@ -610,6 +941,9 @@ def hard_delete_project_row( client: HasuraClient, *, owner: ProjectOwner, project_id: str ) -> None: ensure_projects_schema(client) + existing = _project_row_by_id_including_deleted(client, project_id=project_id) + if not existing or str(existing.get("owner_sub") or "") != owner.sub: + raise PermissionError("project not found") client.run_sql( f""" DELETE FROM amicable_meta.projects diff --git a/src/runtimes/ws_server.py b/src/runtimes/ws_server.py index 9ce12e8..a9566e7 100644 --- a/src/runtimes/ws_server.py +++ b/src/runtimes/ws_server.py @@ -64,6 +64,8 @@ _bootstrap_lock_by_project: dict[str, asyncio.Lock] = {} _git_pull_lock_by_project: dict[str, asyncio.Lock] = {} _agent_run_lock_by_project: dict[str, asyncio.Lock] = {} +_ws_lock_holder_by_project: dict[str, dict[str, Any]] = {} +_ws_lock_state_guard = asyncio.Lock() # Best-effort in-memory limiter for runtime error auto-heal. _runtime_autoheal_state_by_project: dict[str, Any] = {} @@ -71,6 +73,13 @@ _naming_llm: Any = None +class ProjectLockedError(RuntimeError): + def __init__(self, *, locked_by_email: str, locked_at: str): + super().__init__("project_locked") + self.locked_by_email = locked_by_email + self.locked_at = locked_at + + async def _generate_project_name(prompt: str) -> str: """Use a small model to derive a short project name from the user prompt.""" global _naming_llm @@ -344,7 +353,7 @@ def _sanitize_user_ui_context(raw_ctx: Any) -> dict[str, Any] | None: out: dict[str, Any] = {} active_view = str(raw_ctx.get("active_view") or "").strip().lower() - if active_view in {"preview", "code", "database", "design"}: + if active_view in {"preview", "code", "database", "design", "production"}: out["active_view"] = active_view preview_path = raw_ctx.get("preview_path") @@ -409,9 +418,74 @@ def _get_owner_from_ws(ws: WebSocket) -> tuple[str, str]: return ("local", "local@example.com") +def _fallback_name_from_email(email: str) -> str: + local = (email or "").strip().split("@", 1)[0] + cleaned = re.sub(r"[-_.]+", " ", local).strip() + return cleaned[:120] if cleaned else "Amicable User" + + +def _get_actor_from_request(request: Request) -> dict[str, str]: + mode = _auth_mode() + if mode == "google": + user = (request.session or {}).get("user") # type: ignore[attr-defined] + if not isinstance(user, dict): + raise PermissionError("not authenticated") + sub = str(user.get("sub") or "").strip() + email = str(user.get("email") or "").strip() + name_raw = str(user.get("name") or "").strip() + if not sub or not email: + raise PermissionError("not authenticated") + return { + "sub": sub, + "email": email, + "name": name_raw or _fallback_name_from_email(email), + } + + return {"sub": "local", "email": "local@example.com", "name": "Local User"} + + +def _get_actor_from_ws(ws: WebSocket) -> dict[str, str]: + mode = _auth_mode() + if mode == "google": + user = getattr(ws, "session", {}).get("user") # type: ignore[attr-defined] + if not isinstance(user, dict): + raise PermissionError("not authenticated") + sub = str(user.get("sub") or "").strip() + email = str(user.get("email") or "").strip() + name_raw = str(user.get("name") or "").strip() + if not sub or not email: + raise PermissionError("not authenticated") + return { + "sub": sub, + "email": email, + "name": name_raw or _fallback_name_from_email(email), + } + return {"sub": "local", "email": "local@example.com", "name": "Local User"} + + +def _project_locked_payload(locked_by_email: str, locked_at: str) -> dict[str, Any]: + return { + "error": "project_locked", + "code": "project_locked", + "locked_by": { + "email": locked_by_email, + "at": locked_at, + }, + } + + +def _project_locked_json_response(locked_by_email: str, locked_at: str) -> JSONResponse: + return JSONResponse( + _project_locked_payload(locked_by_email=locked_by_email, locked_at=locked_at), + status_code=409, + ) + + def _project_dto(p: Any) -> dict[str, Any]: return { "project_id": getattr(p, "project_id", None), + "owner_sub": getattr(p, "owner_sub", None), + "owner_email": getattr(p, "owner_email", None), "name": getattr(p, "name", None), "slug": getattr(p, "slug", None), "project_prompt": getattr(p, "project_prompt", None), @@ -794,6 +868,8 @@ def _list_sync(): "name": p.name, "slug": p.slug, "template_id": p.template_id, + "owner_email": p.owner_email, + "is_owner": p.owner_sub == sub, "created_at": p.created_at, "updated_at": p.updated_at, } @@ -951,6 +1027,8 @@ def _get_sync(): return JSONResponse( { "project_id": p.project_id, + "owner_email": p.owner_email, + "is_owner": p.owner_sub == sub, "name": p.name, "slug": p.slug, "template_id": p.template_id, @@ -959,6 +1037,215 @@ def _get_sync(): ) +@app.get("/api/projects/{project_id}/members") +async def api_project_members(project_id: str, request: Request) -> JSONResponse: + _require_hasura() + try: + sub, email = _get_owner_from_request(request) + except PermissionError: + return JSONResponse({"error": "not_authenticated"}, status_code=401) + + from src.db.provisioning import hasura_client_from_env + from src.projects.store import ( + ProjectOwner, + get_project_by_id, + list_project_members, + ) + + def _members_sync(): + client = hasura_client_from_env() + owner = ProjectOwner(sub=sub, email=email) + p = get_project_by_id(client, owner=owner, project_id=str(project_id)) + if not p: + return {"error": "not_found"} + members = list_project_members(client, project_id=str(project_id)) + items: list[dict[str, Any]] = [] + owner_email = str(getattr(p, "owner_email", "") or "").strip().lower() + owner_sub = str(getattr(p, "owner_sub", "") or "").strip() + owner_seen = False + for m in members: + m_email = str(m.user_email or "").strip().lower() + is_owner = bool( + (m.user_sub and m.user_sub == owner_sub) + or (m_email and m_email == owner_email) + ) + if is_owner: + owner_seen = True + items.append( + { + "user_sub": m.user_sub, + "email": m_email, + "added_at": m.added_at, + "added_by_sub": m.added_by_sub, + "role": "owner" if is_owner else "editor", + "pending": m.user_sub is None, + } + ) + if not owner_seen: + items.append( + { + "user_sub": owner_sub, + "email": owner_email, + "added_at": getattr(p, "created_at", None), + "added_by_sub": owner_sub or None, + "role": "owner", + "pending": False, + } + ) + items.sort( + key=lambda r: ( + 0 if r.get("role") == "owner" else 1, + str(r.get("email") or ""), + ) + ) + return {"project": p, "members": items} + + out = await asyncio.to_thread(_members_sync) + if isinstance(out, dict) and out.get("error") == "not_found": + return JSONResponse({"error": "not_found"}, status_code=404) + if not isinstance(out, dict): + return JSONResponse({"error": "members_fetch_failed"}, status_code=500) + p = out.get("project") + is_owner = bool(getattr(p, "owner_sub", None) == sub) + return JSONResponse( + { + "project_id": project_id, + "is_owner": is_owner, + "members": out.get("members") or [], + }, + status_code=200, + ) + + +@app.post("/api/projects/{project_id}/members") +async def api_project_members_add(project_id: str, request: Request) -> JSONResponse: + _require_hasura() + body: Any + try: + body = await request.json() + except (ValueError, TypeError): + body = {} + if not isinstance(body, dict): + body = {} + email_raw = str(body.get("email") or "").strip().lower() + if not email_raw: + return JSONResponse({"error": "missing_email"}, status_code=400) + + try: + p = await _ensure_project_owner_access_async(request, project_id=project_id) + actor = _get_actor_from_request(request) + except PermissionError as e: + code = ( + 401 + if str(e) == "not_authenticated" + else 404 + if str(e) == "not_found" + else 403 + ) + return JSONResponse({"error": str(e)}, status_code=code) + + from src.db.provisioning import hasura_client_from_env + from src.projects.store import add_project_member + + def _add_sync(): + client = hasura_client_from_env() + added = add_project_member( + client, + project_id=str(project_id), + user_email=email_raw, + added_by_sub=actor["sub"], + ) + owner_email = str(getattr(p, "owner_email", "") or "").strip().lower() + owner_sub = str(getattr(p, "owner_sub", "") or "").strip() + is_owner = bool( + (added.user_sub and added.user_sub == owner_sub) + or (added.user_email and added.user_email == owner_email) + ) + return { + "user_sub": added.user_sub, + "email": added.user_email, + "added_at": added.added_at, + "added_by_sub": added.added_by_sub, + "role": "owner" if is_owner else "editor", + "pending": added.user_sub is None, + } + + item = await asyncio.to_thread(_add_sync) + return JSONResponse({"member": item}, status_code=200) + + +@app.delete("/api/projects/{project_id}/members/{user_sub}") +async def api_project_members_remove_by_sub( + project_id: str, user_sub: str, request: Request +) -> JSONResponse: + _require_hasura() + target_sub = (user_sub or "").strip() + if not target_sub: + return JSONResponse({"error": "missing_user_sub"}, status_code=400) + try: + p = await _ensure_project_owner_access_async(request, project_id=project_id) + except PermissionError as e: + code = ( + 401 + if str(e) == "not_authenticated" + else 404 + if str(e) == "not_found" + else 403 + ) + return JSONResponse({"error": str(e)}, status_code=code) + if target_sub == str(getattr(p, "owner_sub", "") or "").strip(): + return JSONResponse({"error": "cannot_remove_owner"}, status_code=400) + + from src.db.provisioning import hasura_client_from_env + from src.projects.store import remove_project_member + + ok = await asyncio.to_thread( + remove_project_member, + hasura_client_from_env(), + project_id=str(project_id), + user_sub=target_sub, + ) + if not ok: + return JSONResponse({"error": "member_not_removed"}, status_code=400) + return JSONResponse({"removed": True}, status_code=200) + + +@app.delete("/api/projects/{project_id}/members/by-email/{user_email:path}") +async def api_project_members_remove_by_email( + project_id: str, user_email: str, request: Request +) -> JSONResponse: + _require_hasura() + target_email = (user_email or "").strip().lower() + if not target_email: + return JSONResponse({"error": "missing_user_email"}, status_code=400) + try: + p = await _ensure_project_owner_access_async(request, project_id=project_id) + except PermissionError as e: + code = ( + 401 + if str(e) == "not_authenticated" + else 404 + if str(e) == "not_found" + else 403 + ) + return JSONResponse({"error": str(e)}, status_code=code) + if target_email == str(getattr(p, "owner_email", "") or "").strip().lower(): + return JSONResponse({"error": "cannot_remove_owner"}, status_code=400) + + from src.db.provisioning import hasura_client_from_env + from src.projects.store import remove_project_member_by_email + + ok = await asyncio.to_thread( + remove_project_member_by_email, + hasura_client_from_env(), + project_id=str(project_id), + user_email=target_email, + ) + if not ok: + return JSONResponse({"error": "member_not_removed"}, status_code=400) + return JSONResponse({"removed": True}, status_code=200) + + @app.patch("/api/projects/{project_id}") async def api_rename_project(project_id: str, request: Request) -> JSONResponse: _require_hasura() @@ -1018,12 +1305,26 @@ def _rename_git_sync(): async def api_git_sync_project(project_id: str, request: Request) -> JSONResponse: # noqa: C901 # This endpoint is intended for the browser Code view: after a file save in the # sandbox FS, persist the full sandbox tree back to GitLab (commit + push). + from src.gitlab.config import git_sync_enabled, git_sync_required + + if not git_sync_enabled(): + return JSONResponse({"error": "git_sync_disabled"}, status_code=409) + if not _hasura_enabled(): return JSONResponse({"error": "hasura_not_configured"}, status_code=400) try: - sub, email = _get_owner_from_request(request) - except PermissionError: - return JSONResponse({"error": "not_authenticated"}, status_code=401) + _p, actor = await _ensure_edit_lock_holder_async( + request, project_id=str(project_id) + ) + except ProjectLockedError as e: + return _project_locked_json_response(e.locked_by_email, e.locked_at) + except PermissionError as e: + code = 401 if str(e) == "not_authenticated" else 404 + return JSONResponse({"error": str(e)}, status_code=code) + + sub = actor["sub"] + email = actor["email"] + actor_name = actor["name"] body: Any try: @@ -1040,11 +1341,6 @@ async def api_git_sync_project(project_id: str, request: Request) -> JSONRespons else None ) - from src.gitlab.config import git_sync_enabled, git_sync_required - - if not git_sync_enabled(): - return JSONResponse({"error": "git_sync_disabled"}, status_code=409) - def _load_sync(): from src.db.provisioning import hasura_client_from_env from src.gitlab.integration import ensure_gitlab_repo_for_project @@ -1112,6 +1408,8 @@ def _load_sync(): repo_http_url=str(repo_url), project_slug=slug_for_commit, commit_message=commit_message, + author_name=actor_name, + author_email=email, ) return JSONResponse( { @@ -1239,8 +1537,10 @@ def _load_sync(): ) -@app.post("/api/projects/{project_id}/git/pull") -async def api_git_pull_project(project_id: str, request: Request) -> JSONResponse: # noqa: C901 +@app.get("/api/projects/{project_id}/git/history") +async def api_git_history_project( # noqa: C901 + project_id: str, request: Request, limit: int = 30, ref: str | None = None +) -> JSONResponse: if not _hasura_enabled(): return JSONResponse({"error": "hasura_not_configured"}, status_code=400) try: @@ -1253,6 +1553,108 @@ async def api_git_pull_project(project_id: str, request: Request) -> JSONRespons if not git_sync_enabled(): return JSONResponse({"error": "git_sync_disabled"}, status_code=409) + normalized_limit = max(1, min(100, int(limit or 30))) + ref_name = str(ref or "").strip() or None + + def _load_sync(): + from src.db.provisioning import hasura_client_from_env + from src.gitlab.integration import ensure_gitlab_repo_for_project + from src.projects.store import ProjectOwner, get_project_by_id + + client = hasura_client_from_env() + owner = ProjectOwner(sub=sub, email=email) + project = get_project_by_id(client, owner=owner, project_id=str(project_id)) + if not project: + return {"error": "not_found"} + try: + project, git = ensure_gitlab_repo_for_project( + client, owner=owner, project=project + ) + except Exception as e: + return {"error": "gitlab_error", "detail": str(e)} + return {"ok": True, "project": project, "git": git} + + loaded = await asyncio.to_thread(_load_sync) + if not isinstance(loaded, dict) or not loaded.get("ok"): + if isinstance(loaded, dict) and loaded.get("error") == "not_found": + return JSONResponse({"error": "not_found"}, status_code=404) + if isinstance(loaded, dict) and loaded.get("error") == "gitlab_error": + detail = str(loaded.get("detail") or "") + status = 503 if ("GITLAB_TOKEN" in detail or "required" in detail) else 502 + return JSONResponse( + {"error": "gitlab_error", "detail": detail}, status_code=status + ) + return JSONResponse({"error": "gitlab_error"}, status_code=502) + + project = loaded["project"] + git = loaded["git"] + gitlab_project_id = getattr(project, "gitlab_project_id", None) + if not isinstance(gitlab_project_id, int) and isinstance(git, dict): + raw_id = git.get("gitlab_project_id") + if isinstance(raw_id, int): + gitlab_project_id = raw_id + elif isinstance(raw_id, str) and raw_id.isdigit(): + gitlab_project_id = int(raw_id) + if not isinstance(gitlab_project_id, int): + return JSONResponse({"error": "git_history_unavailable"}, status_code=409) + + try: + from src.gitlab.client import GitLabClient + + commits = await asyncio.to_thread( + GitLabClient.from_env().list_project_commits, + int(gitlab_project_id), + ref_name=ref_name, + limit=normalized_limit, + ) + except Exception as e: + return JSONResponse( + {"error": "git_history_failed", "detail": str(e)}, + status_code=502, + ) + + return JSONResponse( + { + "commits": [ + { + "sha": c.sha, + "short_sha": c.short_sha, + "title": c.title, + "message": c.message, + "author_name": c.author_name, + "author_email": c.author_email, + "authored_date": c.authored_date, + "web_url": c.web_url, + } + for c in commits + ] + }, + status_code=200, + ) + + +@app.post("/api/projects/{project_id}/git/pull") +async def api_git_pull_project(project_id: str, request: Request) -> JSONResponse: # noqa: C901 + from src.gitlab.config import git_sync_enabled + + if not git_sync_enabled(): + return JSONResponse({"error": "git_sync_disabled"}, status_code=409) + + if not _hasura_enabled(): + return JSONResponse({"error": "hasura_not_configured"}, status_code=400) + try: + _p, actor = await _ensure_edit_lock_holder_async( + request, project_id=str(project_id) + ) + except ProjectLockedError as e: + return _project_locked_json_response(e.locked_by_email, e.locked_at) + except PermissionError as e: + code = 401 if str(e) == "not_authenticated" else 404 + return JSONResponse({"error": str(e)}, status_code=code) + + sub = actor["sub"] + email = actor["email"] + def _load_sync(): from src.db.provisioning import hasura_client_from_env from src.gitlab.integration import ensure_gitlab_repo_for_project @@ -1709,6 +2111,55 @@ def _ensure_project_access(request: Request, *, project_id: str): return p +def _ensure_project_owner_access(request: Request, *, project_id: str): + _require_hasura() + try: + sub, email = _get_owner_from_request(request) + except PermissionError: + raise PermissionError("not_authenticated") from None + + from src.db.provisioning import hasura_client_from_env + from src.projects.store import ( + ProjectOwner, + get_project_by_id, + is_project_owner, + ) + + client = hasura_client_from_env() + owner = ProjectOwner(sub=sub, email=email) + p = get_project_by_id(client, owner=owner, project_id=str(project_id)) + if not p: + raise PermissionError("not_found") + if not is_project_owner(client, project_id=str(project_id), user_sub=sub): + raise PermissionError("forbidden") + return p + + +def _ensure_edit_lock_holder(request: Request, *, project_id: str): + if not _hasura_enabled(): + raise RuntimeError("hasura_not_configured") + actor = _get_actor_from_request(request) + + from src.db.provisioning import hasura_client_from_env + from src.projects.store import ProjectOwner, get_project_by_id, get_project_lock + + client = hasura_client_from_env() + owner = ProjectOwner(sub=actor["sub"], email=actor["email"]) + p = get_project_by_id(client, owner=owner, project_id=str(project_id)) + if not p: + raise PermissionError("not_found") + # Some tests inject a minimal placeholder client; skip lock inspection there. + if not hasattr(client, "run_sql"): + return p, actor + lock = get_project_lock(client, project_id=str(project_id)) + if lock is not None and lock.locked_by_sub != actor["sub"]: + raise ProjectLockedError( + locked_by_email=lock.locked_by_email, + locked_at=lock.locked_at, + ) + return p, actor + + async def _ensure_project_access_async(request: Request, *, project_id: str): # Must not block the event loop (Hasura/GitLab calls are synchronous). return await asyncio.to_thread( @@ -1716,6 +2167,18 @@ async def _ensure_project_access_async(request: Request, *, project_id: str): ) +async def _ensure_project_owner_access_async(request: Request, *, project_id: str): + return await asyncio.to_thread( + _ensure_project_owner_access, request, project_id=project_id + ) + + +async def _ensure_edit_lock_holder_async(request: Request, *, project_id: str): + return await asyncio.to_thread( + _ensure_edit_lock_holder, request, project_id=project_id + ) + + def _get_agent() -> Agent: global _agent if _agent is None: @@ -2071,10 +2534,19 @@ async def api_db_schema_apply(project_id: str, request: Request) -> JSONResponse return JSONResponse({"error": "hasura_not_configured"}, status_code=400) try: - sub, email = _get_owner_from_request(request) - except PermissionError: - return JSONResponse({"error": "not_authenticated"}, status_code=401) - + _p, actor = await _ensure_edit_lock_holder_async( + request, project_id=str(project_id) + ) + except ProjectLockedError as e: + return _project_locked_json_response(e.locked_by_email, e.locked_at) + except PermissionError as e: + code = 401 if str(e) == "not_authenticated" else 404 + return JSONResponse({"error": str(e)}, status_code=code) + + sub = actor["sub"] + email = actor["email"] + actor_name = actor["name"] + body: Any try: body = await request.json() @@ -2290,6 +2762,8 @@ def _ensure_git_sync(): repo_http_url=str(repo_url), project_slug=slug_for_commit, commit_message=commit_message, + author_name=actor_name, + author_email=email, ) git_sync_result.update( { @@ -2791,9 +3265,13 @@ async def api_sandbox_read(project_id: str, path: str, request: Request): @app.put("/api/sandbox/{project_id}/write") -async def api_sandbox_write(project_id: str, request: Request) -> JSONResponse: +async def api_sandbox_write(project_id: str, request: Request) -> JSONResponse: # noqa: C901 try: - proj = await _ensure_project_access_async(request, project_id=project_id) + proj, _actor = await _ensure_edit_lock_holder_async( + request, project_id=project_id + ) + except ProjectLockedError as e: + return _project_locked_json_response(e.locked_by_email, e.locked_at) except PermissionError as e: code = 401 if str(e) == "not_authenticated" else 404 return JSONResponse({"error": str(e)}, status_code=code) @@ -2849,7 +3327,11 @@ async def api_sandbox_write(project_id: str, request: Request) -> JSONResponse: @app.post("/api/sandbox/{project_id}/mkdir") async def api_sandbox_mkdir(project_id: str, request: Request) -> JSONResponse: try: - proj = await _ensure_project_access_async(request, project_id=project_id) + proj, _actor = await _ensure_edit_lock_holder_async( + request, project_id=project_id + ) + except ProjectLockedError as e: + return _project_locked_json_response(e.locked_by_email, e.locked_at) except PermissionError as e: code = 401 if str(e) == "not_authenticated" else 404 return JSONResponse({"error": str(e)}, status_code=code) @@ -2888,9 +3370,13 @@ async def api_sandbox_mkdir(project_id: str, request: Request) -> JSONResponse: @app.post("/api/sandbox/{project_id}/create") -async def api_sandbox_create(project_id: str, request: Request) -> JSONResponse: +async def api_sandbox_create(project_id: str, request: Request) -> JSONResponse: # noqa: C901 try: - proj = await _ensure_project_access_async(request, project_id=project_id) + proj, _actor = await _ensure_edit_lock_holder_async( + request, project_id=project_id + ) + except ProjectLockedError as e: + return _project_locked_json_response(e.locked_by_email, e.locked_at) except PermissionError as e: code = 401 if str(e) == "not_authenticated" else 404 return JSONResponse({"error": str(e)}, status_code=code) @@ -2942,7 +3428,11 @@ async def api_sandbox_create(project_id: str, request: Request) -> JSONResponse: @app.post("/api/sandbox/{project_id}/rename") async def api_sandbox_rename(project_id: str, request: Request) -> JSONResponse: try: - proj = await _ensure_project_access_async(request, project_id=project_id) + proj, _actor = await _ensure_edit_lock_holder_async( + request, project_id=project_id + ) + except ProjectLockedError as e: + return _project_locked_json_response(e.locked_by_email, e.locked_at) except PermissionError as e: code = 401 if str(e) == "not_authenticated" else 404 return JSONResponse({"error": str(e)}, status_code=code) @@ -2988,7 +3478,11 @@ async def api_sandbox_rm( project_id: str, request: Request, path: str, recursive: int = 0 ) -> JSONResponse: try: - proj = await _ensure_project_access_async(request, project_id=project_id) + proj, _actor = await _ensure_edit_lock_holder_async( + request, project_id=project_id + ) + except ProjectLockedError as e: + return _project_locked_json_response(e.locked_by_email, e.locked_at) except PermissionError as e: code = 401 if str(e) == "not_authenticated" else 404 return JSONResponse({"error": str(e)}, status_code=code) @@ -3435,7 +3929,9 @@ async def _ensure_project_context_for_session( ) -> None: """Authorize, ensure agent session exists, and persist project/git metadata.""" _require_hasura() - sub, email = _get_owner_from_ws(ws) + actor = _get_actor_from_ws(ws) + sub = actor["sub"] + email = actor["email"] from src.db.provisioning import hasura_client_from_env from src.gitlab.integration import ensure_gitlab_repo_for_project @@ -3468,6 +3964,7 @@ def _ensure_git_sync(): if isinstance(init_data, dict): init_data["project"] = _project_dto(project) init_data["git"] = git + init_data["actor"] = actor agent.session_data[session_id] = init_data @@ -3484,6 +3981,33 @@ def _cleanup_project_runtime_state(session_id: str) -> None: _git_pull_lock_by_project.pop(session_id, None) _agent_run_lock_by_project.pop(session_id, None) _runtime_autoheal_state_by_project.pop(session_id, None) + _ws_lock_holder_by_project.pop(session_id, None) + + +async def _set_ws_lock_holder( + project_id: str, *, ws: WebSocket, user_sub: str, user_email: str +) -> None: + async with _ws_lock_state_guard: + _ws_lock_holder_by_project[project_id] = { + "ws": ws, + "user_sub": user_sub, + "user_email": user_email, + } + + +async def _get_ws_lock_holder(project_id: str) -> dict[str, Any] | None: + async with _ws_lock_state_guard: + holder = _ws_lock_holder_by_project.get(project_id) + if not isinstance(holder, dict): + return None + return dict(holder) + + +async def _clear_ws_lock_holder_if_ws(project_id: str, *, ws: WebSocket) -> None: + async with _ws_lock_state_guard: + holder = _ws_lock_holder_by_project.get(project_id) + if isinstance(holder, dict) and holder.get("ws") is ws: + _ws_lock_holder_by_project.pop(project_id, None) async def _restore_pending_hitl_if_available(agent: Agent, session_id: str) -> None: @@ -3512,605 +4036,826 @@ async def _handle_ws(ws: WebSocket) -> None: # noqa: C901 if _agent is None: _agent = Agent() agent = _agent + held_editor_locks: dict[str, dict[str, str]] = {} - while True: - try: - raw = await ws.receive_text() - except WebSocketDisconnect: - return + try: + while True: + try: + raw = await ws.receive_text() + except WebSocketDisconnect: + return + + msg: dict[str, Any] + try: + msg = json.loads(raw) + except (ValueError, TypeError): + continue - msg: dict[str, Any] - try: - msg = json.loads(raw) - except (ValueError, TypeError): - continue + mtype = msg.get("type") + raw_data = msg.get("data") + data = raw_data if isinstance(raw_data, dict) else {} - mtype = msg.get("type") - raw_data = msg.get("data") - data = raw_data if isinstance(raw_data, dict) else {} + traceparent, tracestate = _ws_trace_headers(data) + ws_session_id = ( + data.get("session_id") + if isinstance(data.get("session_id"), str) and data.get("session_id") + else None + ) + trace_scope = ws_message_span( + name="ws.message", + message_type=str(mtype or "unknown"), + session_id=ws_session_id, + traceparent=traceparent, + tracestate=tracestate, + attributes={ + "ws.has_traceparent": bool(traceparent), + "ws.has_tracestate": bool(tracestate), + "ws.has_text": isinstance(data.get("text"), str) + and bool(data.get("text")), + "ws.has_content_blocks": isinstance( + data.get("content_blocks"), list + ), + "ws.has_error_payload": isinstance(data.get("error"), dict), + "ws.client": str(getattr(ws, "client", "")), + }, + ) + trace_scope.__enter__() + try: + if mtype == MessageType.INIT.value: + session_id = data.get("session_id") + if not session_id: + await ws.send_json( + Message.new( + MessageType.ERROR, + {"error": "missing_session_id"}, + session_id="", + ).to_dict() + ) + await ws.close(code=1008) + return - traceparent, tracestate = _ws_trace_headers(data) - ws_session_id = ( - data.get("session_id") - if isinstance(data.get("session_id"), str) and data.get("session_id") - else None - ) - trace_scope = ws_message_span( - name="ws.message", - message_type=str(mtype or "unknown"), - session_id=ws_session_id, - traceparent=traceparent, - tracestate=tracestate, - attributes={ - "ws.has_traceparent": bool(traceparent), - "ws.has_tracestate": bool(tracestate), - "ws.has_text": isinstance(data.get("text"), str) - and bool(data.get("text")), - "ws.has_content_blocks": isinstance(data.get("content_blocks"), list), - "ws.has_error_payload": isinstance(data.get("error"), dict), - "ws.client": str(getattr(ws, "client", "")), - }, - ) - trace_scope.__enter__() - try: - if mtype == MessageType.INIT.value: - session_id = data.get("session_id") - if not session_id: - await ws.send_json( - Message.new( - MessageType.ERROR, - {"error": "missing_session_id"}, - session_id="", - ).to_dict() + requested_permission_mode = ( + data.get("permission_mode") + if isinstance(data.get("permission_mode"), str) + else None ) - await ws.close(code=1008) - return - - requested_permission_mode = ( - data.get("permission_mode") - if isinstance(data.get("permission_mode"), str) - else None - ) - requested_thinking_level = ( - data.get("thinking_level") - if isinstance(data.get("thinking_level"), str) - else None - ) - - project = None - git = None - try: - _require_hasura() - sub, email = _get_owner_from_ws(ws) - from src.db.provisioning import hasura_client_from_env - from src.gitlab.integration import ensure_gitlab_repo_for_project - from src.projects.store import ProjectOwner, get_project_by_id - - def _load_sync( - *, sub_: str, email_: str, session_id_: str - ) -> tuple[Any, Any]: - client = hasura_client_from_env() - owner = ProjectOwner(sub=sub_, email=email_) - project = get_project_by_id( - client, owner=owner, project_id=str(session_id_) + requested_thinking_level = ( + data.get("thinking_level") + if isinstance(data.get("thinking_level"), str) + else None + ) + force_claim = bool(data.get("force_claim", False)) + + project = None + git = None + actor: dict[str, str] = {} + sub = "" + email = "" + actor_name = "" + try: + _require_hasura() + actor = _get_actor_from_ws(ws) + sub = actor["sub"] + email = actor["email"] + actor_name = actor["name"] + from src.db.provisioning import hasura_client_from_env + from src.gitlab.integration import ( + ensure_gitlab_repo_for_project, ) - if not project: - raise PermissionError("not_found") - project2, git2 = ensure_gitlab_repo_for_project( - client, owner=owner, project=project + from src.projects.store import ( + ProjectLock, + ProjectOwner, + acquire_project_lock, + get_project_by_id, + get_project_lock, + release_project_lock, ) - return project2, git2 - project, git = await asyncio.to_thread( - _load_sync, - sub_=sub, - email_=email, - session_id_=str(session_id), - ) - except PermissionError as e: - # Most common cause: user mismatch between cookie/session and session_id. - logger.warning( - "WS INIT permission denied: %s (session_id=%s client=%s sub=%s)", - e, - session_id, - getattr(ws, "client", None), - sub if "sub" in locals() else None, - ) - await ws.close(code=1008) - return - except Exception as e: - logger.exception( - "WS INIT failed (session_id=%s client=%s)", - session_id, - getattr(ws, "client", None), - ) - await ws.send_json( - Message.new( - MessageType.ERROR, - {"error": "project_init_failed", "detail": str(e)}, - session_id=session_id, - ).to_dict() - ) - await ws.close(code=1011) - return - template_id = ( - getattr(project, "template_id", None) - if project is not None - else None - ) - project_slug = ( - getattr(project, "slug", None) if project is not None else None - ) - try: - exists = await agent.init( - session_id=session_id, - template_id=template_id, - slug=project_slug, - ) - conversation_history = await agent.ensure_ws_chat_history_ready( - str(session_id) - ) - except ChatHistoryPersistenceError as e: - logger.warning( - "WS INIT chat history unavailable (session_id=%s): %s", - session_id, - e.detail, - ) - conversation_history = [] - except Exception as e: - logger.exception( - "WS INIT agent init failed (session_id=%s)", session_id - ) - await ws.send_json( - Message.new( - MessageType.ERROR, - {"error": "project_init_failed", "detail": str(e)}, - session_id=session_id, - ).to_dict() - ) - await ws.close(code=1011) - return - agent.set_session_controls( - session_id, - permission_mode=requested_permission_mode, - thinking_level=requested_thinking_level, - ) - init_data = agent.session_data[session_id] - init_data["exists"] = exists - init_data["conversation_history"] = conversation_history - if project is not None: - init_data["project"] = _project_dto(project) - init_data["template_id"] = getattr(project, "template_id", None) - if git is not None: - init_data["git"] = git - - # Baseline bootstrap commit: only when the remote branch doesn't exist yet. - try: - from src.gitlab.commit_message import ( - deterministic_bootstrap_commit_message, - ) - from src.gitlab.config import git_sync_enabled - from src.gitlab.sync import bootstrap_repo_if_empty + def _load_sync( + *, sub_: str, email_: str, session_id_: str + ) -> tuple[Any, ProjectLock | None]: + client = hasura_client_from_env() + owner = ProjectOwner(sub=sub_, email=email_) + project = get_project_by_id( + client, owner=owner, project_id=str(session_id_) + ) + if not project: + raise PermissionError("not_found") + current_lock = get_project_lock( + client, project_id=str(session_id_) + ) + return project, current_lock - if git_sync_enabled() and isinstance(git, dict): - repo_url = git.get("http_url_to_repo") or git.get( - "repo_http_url" + project, current_lock = await asyncio.to_thread( + _load_sync, + sub_=sub, + email_=email, + session_id_=str(session_id), ) - if isinstance(repo_url, str) and repo_url: - lock = _bootstrap_lock_by_project.get(session_id) + + if ( + current_lock is not None + and current_lock.locked_by_sub != sub + ): + if not force_claim: + await ws.send_json( + Message.new( + MessageType.ERROR, + _project_locked_payload( + locked_by_email=current_lock.locked_by_email, + locked_at=current_lock.locked_at, + ), + session_id=str(session_id), + ).to_dict() + ) + await ws.close(code=1008) + return + + prev_holder = await _get_ws_lock_holder(str(session_id)) + holder_ws = ( + prev_holder.get("ws") + if isinstance(prev_holder, dict) + else None + ) + holder_sub = ( + str(prev_holder.get("user_sub") or "") + if isinstance(prev_holder, dict) + else "" + ) + if ( + holder_ws is not None + and holder_ws is not ws + and holder_sub + and holder_sub != sub + ): + with contextlib.suppress(Exception): + await holder_ws.send_json( + Message.new( + MessageType.SESSION_CLAIMED, + { + "project_id": str(session_id), + "claimed_by": { + "email": email, + "name": actor_name, + }, + }, + session_id=str(session_id), + ).to_dict() + ) + with contextlib.suppress(Exception): + await holder_ws.close(code=1008) + with contextlib.suppress(Exception): + await _clear_ws_lock_holder_if_ws( + str(session_id), ws=holder_ws + ) + + def _lock_and_git_sync( + *, sub_: str, email_: str, session_id_: str, force_: bool + ) -> tuple[Any, Any]: + client = hasura_client_from_env() + owner = ProjectOwner(sub=sub_, email=email_) + project_locked = get_project_by_id( + client, owner=owner, project_id=str(session_id_) + ) + if not project_locked: + raise PermissionError("not_found") + lock = acquire_project_lock( + client, + project_id=str(session_id_), + user_sub=sub_, + user_email=email_, + force=force_, + ) if lock is None: - lock = asyncio.Lock() - _bootstrap_lock_by_project[session_id] = lock - async with lock: - assert agent._session_manager is not None - backend = agent._session_manager.get_backend(session_id) - slug_for_commit = ( - str(getattr(project, "slug", "") or "") - if project is not None - else "" - ).strip() or str(session_id) - commit_msg = deterministic_bootstrap_commit_message( - project_slug=slug_for_commit, - template_id=str(template_id) - if template_id - else None, - project_name=str(getattr(project, "name", "") or "") - if project is not None - else None, - project_prompt=str( - getattr(project, "project_prompt", "") or "" + latest = get_project_lock( + client, project_id=str(session_id_) + ) + if latest is not None and latest.locked_by_sub != sub_: + raise ProjectLockedError( + locked_by_email=latest.locked_by_email, + locked_at=latest.locked_at, ) - if project is not None - else None, + raise RuntimeError("project_lock_acquire_failed") + try: + return ensure_gitlab_repo_for_project( + client, owner=owner, project=project_locked ) - await asyncio.to_thread( - bootstrap_repo_if_empty, - backend, - repo_http_url=str(repo_url), - project_slug=slug_for_commit, - commit_message=commit_msg, + except Exception: + release_project_lock( + client, + project_id=str(session_id_), + user_sub=sub_, ) - except Exception as e: - logger.warning( - "WS git bootstrap failed (session_id=%s): %s", + raise + + project, git = await asyncio.to_thread( + _lock_and_git_sync, + sub_=sub, + email_=email, + session_id_=str(session_id), + force_=force_claim, + ) + await _set_ws_lock_holder( + str(session_id), ws=ws, user_sub=sub, user_email=email + ) + held_editor_locks[str(session_id)] = {"user_sub": sub} + except ProjectLockedError as e: + await ws.send_json( + Message.new( + MessageType.ERROR, + _project_locked_payload( + locked_by_email=e.locked_by_email, + locked_at=e.locked_at, + ), + session_id=str(session_id), + ).to_dict() + ) + await ws.close(code=1008) + return + except PermissionError as e: + # Most common cause: user mismatch between cookie/session and session_id. + logger.warning( + "WS INIT permission denied: %s (session_id=%s client=%s sub=%s)", + e, + session_id, + getattr(ws, "client", None), + sub if "sub" in locals() else None, + ) + await ws.close(code=1008) + return + except Exception as e: + logger.exception( + "WS INIT failed (session_id=%s client=%s)", + session_id, + getattr(ws, "client", None), + ) + await ws.send_json( + Message.new( + MessageType.ERROR, + {"error": "project_init_failed", "detail": str(e)}, + session_id=session_id, + ).to_dict() + ) + await ws.close(code=1011) + return + template_id = ( + getattr(project, "template_id", None) + if project is not None + else None + ) + project_slug = ( + getattr(project, "slug", None) if project is not None else None + ) + try: + exists = await agent.init( + session_id=session_id, + template_id=template_id, + slug=project_slug, + ) + conversation_history = await agent.ensure_ws_chat_history_ready( + str(session_id) + ) + except ChatHistoryPersistenceError as e: + logger.warning( + "WS INIT chat history unavailable (session_id=%s): %s", + session_id, + e.detail, + ) + conversation_history = [] + except Exception as e: + logger.exception( + "WS INIT agent init failed (session_id=%s)", session_id + ) + with contextlib.suppress(Exception): + from src.db.provisioning import hasura_client_from_env + from src.projects.store import release_project_lock + + await asyncio.to_thread( + release_project_lock, + hasura_client_from_env(), + project_id=str(session_id), + user_sub=sub, + ) + with contextlib.suppress(Exception): + await _clear_ws_lock_holder_if_ws(str(session_id), ws=ws) + held_editor_locks.pop(str(session_id), None) + await ws.send_json( + Message.new( + MessageType.ERROR, + {"error": "project_init_failed", "detail": str(e)}, + session_id=session_id, + ).to_dict() + ) + await ws.close(code=1011) + return + agent.set_session_controls( session_id, - e, - exc_info=True, + permission_mode=requested_permission_mode, + thinking_level=requested_thinking_level, ) - # In required mode, treat as init failure; otherwise best-effort. + init_data = agent.session_data[session_id] + init_data["exists"] = exists + init_data["conversation_history"] = conversation_history + if actor: + init_data["actor"] = actor + if project is not None: + init_data["project"] = _project_dto(project) + init_data["template_id"] = getattr(project, "template_id", None) + if git is not None: + init_data["git"] = git + + # Baseline bootstrap commit: only when the remote branch doesn't exist yet. try: - from src.gitlab.config import ( - git_sync_required as _git_sync_required, + from src.gitlab.commit_message import ( + deterministic_bootstrap_commit_message, ) + from src.gitlab.config import git_sync_enabled + from src.gitlab.sync import bootstrap_repo_if_empty - if _git_sync_required(): - await ws.send_json( - Message.new( - MessageType.ERROR, - {"error": "git_bootstrap_failed", "detail": str(e)}, - session_id=session_id, - ).to_dict() + if git_sync_enabled() and isinstance(git, dict): + repo_url = git.get("http_url_to_repo") or git.get( + "repo_http_url" ) - await ws.close(code=1011) - return - except Exception: - logger.exception( - "WS git bootstrap: failed to evaluate git_sync_required() (session_id=%s)", + if isinstance(repo_url, str) and repo_url: + lock = _bootstrap_lock_by_project.get(session_id) + if lock is None: + lock = asyncio.Lock() + _bootstrap_lock_by_project[session_id] = lock + async with lock: + assert agent._session_manager is not None + backend = agent._session_manager.get_backend( + session_id + ) + slug_for_commit = ( + str(getattr(project, "slug", "") or "") + if project is not None + else "" + ).strip() or str(session_id) + commit_msg = deterministic_bootstrap_commit_message( + project_slug=slug_for_commit, + template_id=str(template_id) + if template_id + else None, + project_name=str( + getattr(project, "name", "") or "" + ) + if project is not None + else None, + project_prompt=str( + getattr(project, "project_prompt", "") or "" + ) + if project is not None + else None, + ) + await asyncio.to_thread( + bootstrap_repo_if_empty, + backend, + repo_http_url=str(repo_url), + project_slug=slug_for_commit, + commit_message=commit_msg, + ) + except Exception as e: + logger.warning( + "WS git bootstrap failed (session_id=%s): %s", session_id, + e, + exc_info=True, ) - await _restore_pending_hitl_if_available(agent, str(session_id)) - pending = agent.get_pending_hitl(session_id) - if pending: - init_data["hitl_pending"] = pending - - hook = await agent.emit_session_start_hook(session_id=session_id) - if hook.get("called"): - await ws.send_json( - Message.new( - MessageType.TRACE_EVENT, - { - "phase": "session_start", - "tool_name": "hooks", - "output": hook, - }, - session_id=session_id, - ).to_dict() - ) - - restore_meta = ( - init_data.get("sandbox_restore") - if isinstance(init_data, dict) - else None - ) - logger.info( - "WS INIT ready session_id=%s exists=%s restore=%s", - session_id, - init_data.get("exists") if isinstance(init_data, dict) else None, - restore_meta if isinstance(restore_meta, dict) else {}, - ) - await ws.send_json( - Message.new( - MessageType.INIT, - session_id=session_id, - data=init_data, - ).to_dict() - ) - continue - - if mtype == MessageType.USER.value: - session_id = data.get("session_id") - text_raw = data.get("text") - text = text_raw if isinstance(text_raw, str) else "" - ui_context = _sanitize_user_ui_context(data.get("ui_context")) - user_blocks, blocks_err = _sanitize_user_content_blocks( - data.get("content_blocks") - ) - if blocks_err: - await ws.send_json( - Message.new( - MessageType.ERROR, - {"error": blocks_err}, - session_id=session_id or "", - ).to_dict() - ) - continue + # In required mode, treat as init failure; otherwise best-effort. + try: + from src.gitlab.config import ( + git_sync_required as _git_sync_required, + ) - if not session_id or (not text.strip() and not user_blocks): - await ws.send_json( - Message.new( - MessageType.ERROR, - {"error": "missing session_id or user content"}, - session_id=session_id or "", - ).to_dict() - ) - continue + if _git_sync_required(): + await ws.send_json( + Message.new( + MessageType.ERROR, + { + "error": "git_bootstrap_failed", + "detail": str(e), + }, + session_id=session_id, + ).to_dict() + ) + await ws.close(code=1011) + return + except Exception: + logger.exception( + "WS git bootstrap: failed to evaluate git_sync_required() (session_id=%s)", + session_id, + ) + await _restore_pending_hitl_if_available(agent, str(session_id)) + pending = agent.get_pending_hitl(session_id) + if pending: + init_data["hitl_pending"] = pending - # Give multimodal-only prompts a minimal instruction anchor. - if not text.strip() and user_blocks: - text = "Use the attached image(s) as reference." + hook = await agent.emit_session_start_hook(session_id=session_id) + if hook.get("called"): + await ws.send_json( + Message.new( + MessageType.TRACE_EVENT, + { + "phase": "session_start", + "tool_name": "hooks", + "output": hook, + }, + session_id=session_id, + ).to_dict() + ) - try: - await _ensure_project_context_for_session( - ws=ws, agent=agent, session_id=str(session_id) + restore_meta = ( + init_data.get("sandbox_restore") + if isinstance(init_data, dict) + else None ) - except PermissionError: - await ws.close(code=1008) - return - except Exception as e: - await ws.send_json( - Message.new( - MessageType.ERROR, - {"error": "project_init_failed", "detail": str(e)}, - session_id=session_id, - ).to_dict() + logger.info( + "WS INIT ready session_id=%s exists=%s restore=%s", + session_id, + init_data.get("exists") + if isinstance(init_data, dict) + else None, + restore_meta if isinstance(restore_meta, dict) else {}, ) - await ws.close(code=1011) - return - - await _restore_pending_hitl_if_available(agent, str(session_id)) - pending = agent.get_pending_hitl(session_id) - if pending: await ws.send_json( Message.new( - MessageType.ERROR, - { - "error": "HITL approval pending. Approve/reject the pending tool call to continue." - }, + MessageType.INIT, session_id=session_id, + data=init_data, ).to_dict() ) continue - lock = _agent_run_lock(str(session_id)) - async with lock: - async for out in agent.send_feedback( - session_id=str(session_id), - feedback=text, - user_content_blocks=user_blocks, - ui_context=ui_context, - ): - await ws.send_json(out) - continue - - if mtype == MessageType.RUNTIME_ERROR.value: - session_id = data.get("session_id") - err = data.get("error") - if not session_id or not isinstance(err, dict): - await ws.send_json( - Message.new( - MessageType.ERROR, - {"error": "missing session_id or error"}, - session_id=session_id or "", - ).to_dict() - ) - continue - - # Ensure ownership + session context (project/git metadata) matches USER requests. - try: - await _ensure_project_context_for_session( - ws=ws, agent=agent, session_id=str(session_id) - ) - except PermissionError: - await ws.close(code=1008) - return - except Exception as e: - await ws.send_json( - Message.new( - MessageType.ERROR, - {"error": "project_init_failed", "detail": str(e)}, - session_id=str(session_id), - ).to_dict() + if mtype == MessageType.USER.value: + session_id = data.get("session_id") + text_raw = data.get("text") + text = text_raw if isinstance(text_raw, str) else "" + ui_context = _sanitize_user_ui_context(data.get("ui_context")) + user_blocks, blocks_err = _sanitize_user_content_blocks( + data.get("content_blocks") ) - await ws.close(code=1011) - return + if blocks_err: + await ws.send_json( + Message.new( + MessageType.ERROR, + {"error": blocks_err}, + session_id=session_id or "", + ).to_dict() + ) + continue - await _restore_pending_hitl_if_available(agent, str(session_id)) - pending = agent.get_pending_hitl(str(session_id)) - if pending: - await ws.send_json( - Message.new( - MessageType.ERROR, - { - "error": "HITL approval pending. Approve/reject the pending tool call to continue." - }, - session_id=str(session_id), - ).to_dict() - ) - continue + if not session_id or (not text.strip() and not user_blocks): + await ws.send_json( + Message.new( + MessageType.ERROR, + {"error": "missing session_id or user content"}, + session_id=session_id or "", + ).to_dict() + ) + continue + if str(session_id) not in held_editor_locks: + await ws.send_json( + Message.new( + MessageType.ERROR, + {"error": "missing_init_or_lock"}, + session_id=str(session_id), + ).to_dict() + ) + continue - if not _runtime_auto_heal_enabled(): - continue + # Give multimodal-only prompts a minimal instruction anchor. + if not text.strip() and user_blocks: + text = "Use the attached image(s) as reference." - lock = _agent_run_lock(str(session_id)) + try: + await _ensure_project_context_for_session( + ws=ws, agent=agent, session_id=str(session_id) + ) + except PermissionError: + await ws.close(code=1008) + return + except Exception as e: + await ws.send_json( + Message.new( + MessageType.ERROR, + {"error": "project_init_failed", "detail": str(e)}, + session_id=session_id, + ).to_dict() + ) + await ws.close(code=1011) + return - from src.runtime_autoheal import ( - RuntimeAutoHealConfig, - RuntimeAutoHealState, - apply_runtime_auto_heal_decision, - decide_runtime_auto_heal, - ) + await _restore_pending_hitl_if_available(agent, str(session_id)) + pending = agent.get_pending_hitl(session_id) + if pending: + await ws.send_json( + Message.new( + MessageType.ERROR, + { + "error": "HITL approval pending. Approve/reject the pending tool call to continue." + }, + session_id=session_id, + ).to_dict() + ) + continue - fp = str(err.get("fingerprint") or "").strip() or _fingerprint_fallback( - err - ) - now_ms = int(time.time() * 1000) - try: - raw_ts = err.get("ts_ms") - if isinstance(raw_ts, (int, float)) and raw_ts > 0: - now_ms = int(raw_ts) - except (TypeError, ValueError): - logger.debug( - "Failed to parse ts_ms from runtime error event", exc_info=True - ) + lock = _agent_run_lock(str(session_id)) + async with lock: + async for out in agent.send_feedback( + session_id=str(session_id), + feedback=text, + user_content_blocks=user_blocks, + ui_context=ui_context, + ): + await ws.send_json(out) + continue - st = _runtime_autoheal_state_by_project.get(str(session_id)) - if not isinstance(st, RuntimeAutoHealState): - st = RuntimeAutoHealState() + if mtype == MessageType.RUNTIME_ERROR.value: + session_id = data.get("session_id") + err = data.get("error") + if not session_id or not isinstance(err, dict): + await ws.send_json( + Message.new( + MessageType.ERROR, + {"error": "missing session_id or error"}, + session_id=session_id or "", + ).to_dict() + ) + continue + if str(session_id) not in held_editor_locks: + await ws.send_json( + Message.new( + MessageType.ERROR, + {"error": "missing_init_or_lock"}, + session_id=str(session_id), + ).to_dict() + ) + continue - cfg = RuntimeAutoHealConfig( - enabled=True, - cooldown_s=_runtime_auto_heal_cooldown_s(), - dedupe_window_s=600, - max_attempts_per_fingerprint=_runtime_auto_heal_max_attempts_per_fingerprint(), - ) + # Ensure ownership + session context (project/git metadata) matches USER requests. + try: + await _ensure_project_context_for_session( + ws=ws, agent=agent, session_id=str(session_id) + ) + except PermissionError: + await ws.close(code=1008) + return + except Exception as e: + await ws.send_json( + Message.new( + MessageType.ERROR, + {"error": "project_init_failed", "detail": str(e)}, + session_id=str(session_id), + ).to_dict() + ) + await ws.close(code=1011) + return - decision = decide_runtime_auto_heal( - state=st, fingerprint=fp, cfg=cfg, now_ms=now_ms - ) - if not decision.allowed: - if decision.reason == "max_attempts": + await _restore_pending_hitl_if_available(agent, str(session_id)) + pending = agent.get_pending_hitl(str(session_id)) + if pending: await ws.send_json( Message.new( MessageType.ERROR, { - "error": "runtime_auto_heal_paused", - "detail": "Auto-heal paused for this repeating runtime error (max attempts reached).", - "fingerprint": fp, + "error": "HITL approval pending. Approve/reject the pending tool call to continue." }, session_id=str(session_id), ).to_dict() ) - continue + continue + + if not _runtime_auto_heal_enabled(): + continue - kind = str(err.get("kind") or "window_error") + lock = _agent_run_lock(str(session_id)) + + from src.runtime_autoheal import ( + RuntimeAutoHealConfig, + RuntimeAutoHealState, + apply_runtime_auto_heal_decision, + decide_runtime_auto_heal, + ) - # Optionally include preview logs for load failures. - preview_logs = "" - if kind == "preview_load_failed": + fp = str( + err.get("fingerprint") or "" + ).strip() or _fingerprint_fallback(err) + now_ms = int(time.time() * 1000) try: - if agent._session_manager is not None: - backend = agent._session_manager.get_backend( - str(session_id) - ) - res = await asyncio.to_thread( - backend.execute, - "tail -n 200 /tmp/amicable-preview.log || true", - ) - out = getattr(res, "output", "") or "" - preview_logs = str(out) - except Exception: + raw_ts = err.get("ts_ms") + if isinstance(raw_ts, (int, float)) and raw_ts > 0: + now_ms = int(raw_ts) + except (TypeError, ValueError): logger.debug( - "Failed to fetch preview logs for auto-heal", exc_info=True + "Failed to parse ts_ms from runtime error event", + exc_info=True, ) - preview_logs = "" - from src.runtime_error_feedback import ( - build_runtime_error_feedback_prompt, - ) + st = _runtime_autoheal_state_by_project.get(str(session_id)) + if not isinstance(st, RuntimeAutoHealState): + st = RuntimeAutoHealState() + + cfg = RuntimeAutoHealConfig( + enabled=True, + cooldown_s=_runtime_auto_heal_cooldown_s(), + dedupe_window_s=600, + max_attempts_per_fingerprint=_runtime_auto_heal_max_attempts_per_fingerprint(), + ) + + decision = decide_runtime_auto_heal( + state=st, fingerprint=fp, cfg=cfg, now_ms=now_ms + ) + if not decision.allowed: + if decision.reason == "max_attempts": + await ws.send_json( + Message.new( + MessageType.ERROR, + { + "error": "runtime_auto_heal_paused", + "detail": "Auto-heal paused for this repeating runtime error (max attempts reached).", + "fingerprint": fp, + }, + session_id=str(session_id), + ).to_dict() + ) + continue + + kind = str(err.get("kind") or "window_error") + + # Optionally include preview logs for load failures. + preview_logs = "" + if kind == "preview_load_failed": + try: + if agent._session_manager is not None: + backend = agent._session_manager.get_backend( + str(session_id) + ) + res = await asyncio.to_thread( + backend.execute, + "tail -n 200 /tmp/amicable-preview.log || true", + ) + out = getattr(res, "output", "") or "" + preview_logs = str(out) + except Exception: + logger.debug( + "Failed to fetch preview logs for auto-heal", + exc_info=True, + ) + preview_logs = "" + + from src.runtime_error_feedback import ( + build_runtime_error_feedback_prompt, + ) + + prompt = build_runtime_error_feedback_prompt( + err=err, preview_logs=preview_logs + ) + screenshot: dict[str, Any] | None = None + if _autoheal_screenshot_enabled(): + try: + screenshot = await agent.capture_preview_screenshot( + session_id=str(session_id), + path="/", + # Prefer a bounded payload and faster capture for auto-heal. + full_page=False, + timeout_s=12, + ) + except Exception: + logger.debug( + "Failed to capture preview screenshot for auto-heal", + exc_info=True, + ) + screenshot = None + user_content_blocks = _runtime_autoheal_user_content_blocks( + prompt, screenshot + ) - prompt = build_runtime_error_feedback_prompt( - err=err, preview_logs=preview_logs - ) - screenshot: dict[str, Any] | None = None - if _autoheal_screenshot_enabled(): try: - screenshot = await agent.capture_preview_screenshot( - session_id=str(session_id), - path="/", - # Prefer a bounded payload and faster capture for auto-heal. - full_page=False, - timeout_s=12, - ) + # Avoid queuing auto-heal runs behind user-initiated runs. + await asyncio.wait_for(lock.acquire(), timeout=0.0) + except TimeoutError: + continue except Exception: logger.debug( - "Failed to capture preview screenshot for auto-heal", + "Unexpected error acquiring auto-heal lock for session %s", + session_id, exc_info=True, ) - screenshot = None - user_content_blocks = _runtime_autoheal_user_content_blocks( - prompt, screenshot - ) - - try: - # Avoid queuing auto-heal runs behind user-initiated runs. - await asyncio.wait_for(lock.acquire(), timeout=0.0) - except TimeoutError: - continue - except Exception: - logger.debug( - "Unexpected error acquiring auto-heal lock for session %s", - session_id, - exc_info=True, + continue + + # Mark as handled (attempt count + cooldown) once we actually start the run. + _runtime_autoheal_state_by_project[str(session_id)] = ( + apply_runtime_auto_heal_decision( + state=st, + fingerprint=fp, + attempts=decision.attempts, + now_ms=now_ms, + ) ) - continue - # Mark as handled (attempt count + cooldown) once we actually start the run. - _runtime_autoheal_state_by_project[str(session_id)] = ( - apply_runtime_auto_heal_decision( - state=st, - fingerprint=fp, - attempts=decision.attempts, - now_ms=now_ms, - ) - ) + try: + async for out in agent.send_feedback( + session_id=str(session_id), + feedback=prompt, + user_content_blocks=user_content_blocks, + ): + await ws.send_json(out) + finally: + with contextlib.suppress(RuntimeError): + lock.release() + continue - try: - async for out in agent.send_feedback( - session_id=str(session_id), - feedback=prompt, - user_content_blocks=user_content_blocks, + if mtype == MessageType.HITL_RESPONSE.value: + session_id = data.get("session_id") + interrupt_id = data.get("interrupt_id") + response = data.get("response") + if ( + not session_id + or not isinstance(interrupt_id, str) + or not isinstance(response, dict) ): - await ws.send_json(out) - finally: - with contextlib.suppress(RuntimeError): - lock.release() - continue + await ws.send_json( + Message.new( + MessageType.ERROR, + { + "error": "missing session_id, interrupt_id, or response" + }, + session_id=session_id or "", + ).to_dict() + ) + continue + if str(session_id) not in held_editor_locks: + await ws.send_json( + Message.new( + MessageType.ERROR, + {"error": "missing_init_or_lock"}, + session_id=str(session_id), + ).to_dict() + ) + continue + + try: + await _ensure_project_context_for_session( + ws=ws, agent=agent, session_id=str(session_id) + ) + except PermissionError: + await ws.close(code=1008) + return + except Exception as e: + await ws.send_json( + Message.new( + MessageType.ERROR, + {"error": "project_init_failed", "detail": str(e)}, + session_id=session_id, + ).to_dict() + ) + await ws.close(code=1011) + return + lock = _agent_run_lock(str(session_id)) + async with lock: + async for out in agent.resume_hitl( + session_id=str(session_id), + interrupt_id=interrupt_id, + response=response, + ): + await ws.send_json(out) + continue - if mtype == MessageType.HITL_RESPONSE.value: - session_id = data.get("session_id") - interrupt_id = data.get("interrupt_id") - response = data.get("response") - if ( - not session_id - or not isinstance(interrupt_id, str) - or not isinstance(response, dict) - ): + # Ignore unknowns (frontend can send ping) + if mtype == MessageType.PING.value: await ws.send_json( Message.new( - MessageType.ERROR, - {"error": "missing session_id, interrupt_id, or response"}, - session_id=session_id or "", + MessageType.PING, + {}, + session_id=data.get("session_id") or "", ).to_dict() ) continue + finally: + trace_scope.__exit__(None, None, None) + finally: + if held_editor_locks: + for sid, lock_meta in list(held_editor_locks.items()): + user_sub = str(lock_meta.get("user_sub") or "").strip() + if not sid or not user_sub: + continue - try: - await _ensure_project_context_for_session( - ws=ws, agent=agent, session_id=str(session_id) - ) - except PermissionError: - await ws.close(code=1008) - return - except Exception as e: - await ws.send_json( - Message.new( - MessageType.ERROR, - {"error": "project_init_failed", "detail": str(e)}, - session_id=session_id, - ).to_dict() + async def _release_sync( + *, sid_: str, user_sub_: str, ws_: WebSocket + ) -> None: + from src.db.provisioning import hasura_client_from_env + from src.projects.store import release_project_lock + + await asyncio.to_thread( + release_project_lock, + hasura_client_from_env(), + project_id=sid_, + user_sub=user_sub_, ) - await ws.close(code=1011) - return - lock = _agent_run_lock(str(session_id)) - async with lock: - async for out in agent.resume_hitl( - session_id=str(session_id), - interrupt_id=interrupt_id, - response=response, - ): - await ws.send_json(out) - continue + await _clear_ws_lock_holder_if_ws(sid_, ws=ws_) - # Ignore unknowns (frontend can send ping) - if mtype == MessageType.PING.value: - await ws.send_json( - Message.new( - MessageType.PING, {}, session_id=data.get("session_id") or "" - ).to_dict() - ) - continue - finally: - trace_scope.__exit__(None, None, None) + with contextlib.suppress(Exception): + await _release_sync(sid_=sid, user_sub_=user_sub, ws_=ws) @app.websocket("/") diff --git a/tests/test_api_git_history.py b/tests/test_api_git_history.py new file mode 100644 index 0000000..16cd792 --- /dev/null +++ b/tests/test_api_git_history.py @@ -0,0 +1,132 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from src.gitlab.client import GitLabCommit + + +@dataclass +class _FakeProject: + project_id: str + slug: str + template_id: str + gitlab_project_id: int | None = None + + +def test_api_git_history_success(monkeypatch) -> None: + from fastapi.testclient import TestClient + + import src.runtimes.ws_server as ws_server + + monkeypatch.setattr(ws_server, "_hasura_enabled", lambda: True) + monkeypatch.setattr( + ws_server, "_get_owner_from_request", lambda _request: ("u1", "u1@example.com") + ) + + import src.db.provisioning as provisioning + import src.gitlab.client as gitlab_client + import src.gitlab.config as gitlab_config + import src.gitlab.integration as gitlab_integration + import src.projects.store as projects_store + + monkeypatch.setattr(provisioning, "hasura_client_from_env", lambda: object()) + monkeypatch.setattr(gitlab_config, "git_sync_enabled", lambda: True) + + proj = _FakeProject( + project_id="p1", + slug="my-proj", + template_id="vite", + gitlab_project_id=123, + ) + monkeypatch.setattr(projects_store, "get_project_by_id", lambda *_a, **_k: proj) + monkeypatch.setattr( + gitlab_integration, + "ensure_gitlab_repo_for_project", + lambda *_a, **_k: (proj, {"gitlab_project_id": 123}), + ) + + calls: list[tuple[int, str | None, int]] = [] + + class _FakeGitLabClient: + def list_project_commits( + self, project_id: int, *, ref_name: str | None = None, limit: int = 30 + ): + calls.append((project_id, ref_name, limit)) + return [ + GitLabCommit( + sha="deadbeefcafebabe", + short_sha="deadbeef", + title="Update app", + message="Update app\n\nwith details", + author_name="Editor User", + author_email="editor@example.com", + authored_date="2026-02-16T10:00:00Z", + web_url="https://git.example/commit/deadbeefcafebabe", + ) + ] + + monkeypatch.setattr( + gitlab_client.GitLabClient, + "from_env", + staticmethod(lambda: _FakeGitLabClient()), + ) + + client = TestClient(ws_server.app) + res = client.get("/api/projects/p1/git/history?limit=5&ref=feature/demo") + assert res.status_code == 200 + body = res.json() + assert len(body["commits"]) == 1 + assert body["commits"][0]["short_sha"] == "deadbeef" + assert body["commits"][0]["author_email"] == "editor@example.com" + assert calls == [(123, "feature/demo", 5)] + + +def test_api_git_history_unavailable_without_project_id(monkeypatch) -> None: + from fastapi.testclient import TestClient + + import src.runtimes.ws_server as ws_server + + monkeypatch.setattr(ws_server, "_hasura_enabled", lambda: True) + monkeypatch.setattr( + ws_server, "_get_owner_from_request", lambda _request: ("u1", "u1@example.com") + ) + + import src.db.provisioning as provisioning + import src.gitlab.config as gitlab_config + import src.gitlab.integration as gitlab_integration + import src.projects.store as projects_store + + monkeypatch.setattr(provisioning, "hasura_client_from_env", lambda: object()) + monkeypatch.setattr(gitlab_config, "git_sync_enabled", lambda: True) + proj = _FakeProject(project_id="p1", slug="my-proj", template_id="vite") + monkeypatch.setattr(projects_store, "get_project_by_id", lambda *_a, **_k: proj) + monkeypatch.setattr( + gitlab_integration, + "ensure_gitlab_repo_for_project", + lambda *_a, **_k: (proj, {}), + ) + + client = TestClient(ws_server.app) + res = client.get("/api/projects/p1/git/history") + assert res.status_code == 409 + assert res.json()["error"] == "git_history_unavailable" + + +def test_api_git_history_disabled(monkeypatch) -> None: + from fastapi.testclient import TestClient + + import src.runtimes.ws_server as ws_server + + monkeypatch.setattr(ws_server, "_hasura_enabled", lambda: True) + monkeypatch.setattr( + ws_server, "_get_owner_from_request", lambda _request: ("u1", "u1@example.com") + ) + + import src.gitlab.config as gitlab_config + + monkeypatch.setattr(gitlab_config, "git_sync_enabled", lambda: False) + + client = TestClient(ws_server.app) + res = client.get("/api/projects/p1/git/history") + assert res.status_code == 409 + assert res.json()["error"] == "git_sync_disabled" diff --git a/tests/test_gitlab_client.py b/tests/test_gitlab_client.py index abd472f..0b92548 100644 --- a/tests/test_gitlab_client.py +++ b/tests/test_gitlab_client.py @@ -91,3 +91,46 @@ def test_delete_project_success(monkeypatch): gl = GitLabClient.from_env(session=s) ok = gl.delete_project(123) assert ok is True + + +def test_list_project_commits_success(monkeypatch): + monkeypatch.setenv("GITLAB_TOKEN", "t") + monkeypatch.setenv("GITLAB_BASE_URL", "https://git.example") + + url = "https://git.example/api/v4/projects/123/repository/commits" + routes = { + ("GET", url): _Resp( + 200, + [ + { + "id": "deadbeefcafebabe", + "short_id": "deadbeef", + "title": "Add sharing", + "message": "Add sharing support", + "author_name": "Editor User", + "author_email": "editor@example.com", + "authored_date": "2026-02-16T10:00:00Z", + "web_url": "https://git.example/commit/deadbeefcafebabe", + } + ], + ) + } + s = _Sess(routes) + gl = GitLabClient.from_env(session=s) + commits = gl.list_project_commits(123, ref_name="main", limit=20) + assert len(commits) == 1 + assert commits[0].short_sha == "deadbeef" + assert commits[0].author_email == "editor@example.com" + assert s.calls[-1][2] == {"per_page": 20, "ref_name": "main"} + + +def test_list_project_commits_invalid_payload(monkeypatch): + monkeypatch.setenv("GITLAB_TOKEN", "t") + monkeypatch.setenv("GITLAB_BASE_URL", "https://git.example") + + url = "https://git.example/api/v4/projects/123/repository/commits" + routes = {("GET", url): _Resp(200, {"unexpected": True})} + s = _Sess(routes) + gl = GitLabClient.from_env(session=s) + with pytest.raises(GitLabError): + gl.list_project_commits(123) diff --git a/tests/test_projects_store.py b/tests/test_projects_store.py index 8bbffe0..8d712f7 100644 --- a/tests/test_projects_store.py +++ b/tests/test_projects_store.py @@ -6,13 +6,21 @@ from src.projects.store import ( ProjectOwner, + acquire_project_lock, + add_project_member, create_project, ensure_project_for_id, get_project_by_id, get_project_by_slug, + get_project_lock, hard_delete_project_row, + is_project_lock_holder, + list_project_members, list_projects, mark_project_deleted, + release_project_lock, + remove_project_member, + remove_project_member_by_email, rename_project, slugify, ) @@ -21,6 +29,7 @@ class FakeHasuraClient: def __init__(self) -> None: self.projects: dict[str, dict] = {} + self.project_members: dict[tuple[str, str], dict] = {} class _Cfg: source_name = "default" @@ -43,13 +52,59 @@ def run_sql(self, sql: str, *, read_only: bool = False): # noqa: C901 if m and sql_l.startswith("select") and "from amicable_meta.projects" in sql_l: pid = m.group(1) row = self.projects.get(pid) - if not row or row.get("deleted_at"): + has_deleted_filter = "deleted_at is null" in sql_l + if not row or (has_deleted_filter and row.get("deleted_at")): return { "result_type": "TuplesOk", "result": [ ["project_id"], ], } + if ( + "select locked_by_sub, locked_by_email, locked_at" in sql_l + and "locked_by_sub is not null" in sql_l + ): + if not row.get("locked_by_sub"): + return { + "result_type": "TuplesOk", + "result": [["locked_by_sub"]], + } + return { + "result_type": "TuplesOk", + "result": [ + ["locked_by_sub", "locked_by_email", "locked_at"], + [ + row.get("locked_by_sub"), + row.get("locked_by_email"), + row.get("locked_at"), + ], + ], + } + if ( + "select project_id, owner_sub, owner_email, name, slug, deleted_at" + in sql_l + ): + return { + "result_type": "TuplesOk", + "result": [ + [ + "project_id", + "owner_sub", + "owner_email", + "name", + "slug", + "deleted_at", + ], + [ + row["project_id"], + row["owner_sub"], + row["owner_email"], + row["name"], + row["slug"], + row.get("deleted_at"), + ], + ], + } header = [ "project_id", "owner_sub", @@ -156,6 +211,186 @@ def run_sql(self, sql: str, *, read_only: bool = False): # noqa: C901 ) return {"result_type": "TuplesOk", "result": out} + # List projects by membership (new EXISTS query). + if ( + sql_l.startswith("select p.project_id") + and "from amicable_meta.projects p" in sql_l + and "from amicable_meta.project_members pm" in sql_l + ): + sub_match = re.search(r"pm.user_sub\s*=\s*'([^']+)'", sql, flags=re.I) + email_match = re.search(r"pm.user_email\s*=\s*'([^']+)'", sql, flags=re.I) + sub = sub_match.group(1) if sub_match else "" + email = email_match.group(1) if email_match else "" + header = [ + "project_id", + "owner_sub", + "owner_email", + "name", + "slug", + "project_prompt", + "sandbox_id", + "template_id", + "gitlab_project_id", + "gitlab_path", + "gitlab_web_url", + "created_at", + "updated_at", + ] + out = [header] + for r in self.projects.values(): + if r.get("deleted_at"): + continue + key_email = (r["project_id"], email.lower()) + has_member = any( + m.get("project_id") == r["project_id"] + and ( + (sub and m.get("user_sub") == sub) + or (email and m.get("user_email") == email.lower()) + ) + for m in self.project_members.values() + ) + if not has_member and key_email not in self.project_members: + continue + out.append( + [ + r["project_id"], + r["owner_sub"], + r["owner_email"], + r["name"], + r["slug"], + r.get("project_prompt"), + r.get("sandbox_id"), + r.get("template_id"), + r.get("gitlab_project_id"), + r.get("gitlab_path"), + r.get("gitlab_web_url"), + r.get("created_at"), + r.get("updated_at"), + ] + ) + return {"result_type": "TuplesOk", "result": out} + + # Membership existence checks. + if ( + sql_l.startswith("select 1") + and "from amicable_meta.project_members" in sql_l + and "where project_id" in sql_l + ): + pid = re.search(r"project_id\s*=\s*'([^']+)'", sql, flags=re.I).group(1) # type: ignore[union-attr] + sub_match = re.search(r"user_sub\s*=\s*'([^']+)'", sql, flags=re.I) + email_match = re.search(r"user_email\s*=\s*'([^']+)'", sql, flags=re.I) + sub = sub_match.group(1) if sub_match else "" + email = email_match.group(1).lower() if email_match else "" + found = any( + m.get("project_id") == pid + and ( + (sub and m.get("user_sub") == sub) + or (email and m.get("user_email") == email) + ) + for m in self.project_members.values() + ) + return ( + {"result_type": "TuplesOk", "result": [["1"], [1]]} + if found + else { + "result_type": "TuplesOk", + "result": [["1"]], + } + ) + + # Member lookup/list. + if ( + sql_l.startswith( + "select project_id, user_sub, user_email, added_at, added_by_sub" + ) + and "from amicable_meta.project_members" in sql_l + ): + pid = re.search(r"project_id\s*=\s*'([^']+)'", sql, flags=re.I).group(1) # type: ignore[union-attr] + email_match = re.search(r"user_email\s*=\s*'([^']+)'", sql, flags=re.I) + header = [ + "project_id", + "user_sub", + "user_email", + "added_at", + "added_by_sub", + ] + out = [header] + for m in self.project_members.values(): + if m.get("project_id") != pid: + continue + if email_match and m.get("user_email") != email_match.group(1).lower(): + continue + out.append( + [ + m.get("project_id"), + m.get("user_sub"), + m.get("user_email"), + m.get("added_at"), + m.get("added_by_sub"), + ] + ) + return {"result_type": "TuplesOk", "result": out} + + # INSERT member. + if sql_l.startswith("insert into amicable_meta.project_members"): + vals = re.search(r"values\s*\((.*)\)\s*on conflict", sql, flags=re.I | re.S) + assert vals + raw_parts = re.findall(r"'(?:''|[^'])*'|NULL", vals.group(1)) + parts = [ + p[1:-1].replace("''", "'") + if p.startswith("'") and p.endswith("'") + else None + for p in raw_parts + ] + pid = parts[0] or "" + user_sub = parts[1] + user_email = (parts[2] or "").lower() + added_by_sub = parts[3] + key = (pid, user_email) + existing = self.project_members.get(key) + if existing: + if user_sub: + existing["user_sub"] = user_sub + else: + self.project_members[key] = { + "project_id": pid, + "user_sub": user_sub, + "user_email": user_email, + "added_at": "t0", + "added_by_sub": added_by_sub, + } + return {"result_type": "CommandOk", "result": []} + + # DELETE member by sub. + if ( + sql_l.startswith("delete from amicable_meta.project_members") + and "and user_sub" in sql_l + ): + pid = re.search(r"project_id\s*=\s*'([^']+)'", sql, flags=re.I).group(1) # type: ignore[union-attr] + sub = re.search(r"user_sub\s*=\s*'([^']+)'", sql, flags=re.I).group(1) # type: ignore[union-attr] + keys = [ + k + for k, m in self.project_members.items() + if m.get("project_id") == pid and m.get("user_sub") == sub + ] + for k in keys: + self.project_members.pop(k, None) + return {"result_type": "CommandOk", "result": []} + + # DELETE member by email. + if ( + sql_l.startswith("delete from amicable_meta.project_members") + and "and user_email" in sql_l + ): + pid = re.search(r"project_id\s*=\s*'([^']+)'", sql, flags=re.I).group(1) # type: ignore[union-attr] + email = ( + re.search(r"user_email\s*=\s*'([^']+)'", sql, flags=re.I) + .group(1) + .lower() + ) # type: ignore[union-attr] + self.project_members.pop((pid, email), None) + return {"result_type": "CommandOk", "result": []} + # INSERT project. if sql_l.startswith("insert into amicable_meta.projects"): cols_match = re.search( @@ -202,6 +437,9 @@ def run_sql(self, sql: str, *, read_only: bool = False): # noqa: C901 "created_at": "t0", "updated_at": "t0", "deleted_at": None, + "locked_by_sub": None, + "locked_by_email": None, + "locked_at": None, } return {"result_type": "CommandOk", "result": []} @@ -245,6 +483,46 @@ def run_sql(self, sql: str, *, read_only: bool = False): # noqa: C901 row["updated_at"] = "t_del" return {"result_type": "CommandOk", "result": []} + # UPDATE acquire/release lock. + if ( + sql_l.startswith("update amicable_meta.projects") + and "set locked_by_sub" in sql_l + ): + pid = re.search(r"where project_id\s*=\s*'([^']+)'", sql, flags=re.I).group( + 1 + ) # type: ignore[union-attr] + row = self.projects.get(pid) + if not row or row.get("deleted_at"): + return {"result_type": "CommandOk", "result": []} + if "locked_by_sub = null" in sql_l: + current_holder_match = re.search( + r"and locked_by_sub\s*=\s*'([^']+)'", sql, flags=re.I + ) + current_holder = ( + current_holder_match.group(1) if current_holder_match else None + ) + if current_holder and row.get("locked_by_sub") == current_holder: + row["locked_by_sub"] = None + row["locked_by_email"] = None + row["locked_at"] = None + return {"result_type": "CommandOk", "result": []} + + new_sub = re.search( + r"set locked_by_sub\s*=\s*'([^']+)'", sql, flags=re.I + ).group(1) # type: ignore[union-attr] + new_email = re.search( + r"locked_by_email\s*=\s*'([^']*)'", sql, flags=re.I + ).group(1) # type: ignore[union-attr] + if "and (locked_by_sub is null or locked_by_sub =" in sql_l and row.get( + "locked_by_sub" + ) not in (None, new_sub): + return {"result_type": "CommandOk", "result": []} + row["locked_by_sub"] = new_sub + row["locked_by_email"] = new_email + row["locked_at"] = "t_lock" + row["updated_at"] = "t_lock" + return {"result_type": "CommandOk", "result": []} + # DELETE. if sql_l.startswith("delete from amicable_meta.projects"): pid = re.search(r"where project_id\s*=\s*'([^']+)'", sql, flags=re.I).group( @@ -318,3 +596,78 @@ def test_ensure_project_for_id_owner_mismatch() -> None: with pytest.raises(PermissionError): ensure_project_for_id(c, owner=owner2, project_id="abc-123") + + +def test_project_members_crud_and_last_member_guard() -> None: + c = FakeHasuraClient() + owner = ProjectOwner(sub="owner-sub", email="owner@example.com") + editor = ProjectOwner(sub="editor-sub", email="editor@example.com") + project = create_project(c, owner=owner, name="Shared Project") + + members = list_project_members(c, project_id=project.project_id) + assert [m.user_email for m in members] == ["owner@example.com"] + + added = add_project_member( + c, + project_id=project.project_id, + user_sub=editor.sub, + user_email=editor.email, + added_by_sub=owner.sub, + ) + assert added.user_sub == editor.sub + assert added.user_email == editor.email + + # Members can access the project via standard owner+member lookup. + shared_view = get_project_by_id(c, owner=editor, project_id=project.project_id) + assert shared_view is not None + + assert remove_project_member(c, project_id=project.project_id, user_sub=editor.sub) + # Removing the last remaining member is blocked. + assert not remove_project_member_by_email( + c, project_id=project.project_id, user_email=owner.email + ) + + +def test_project_lock_acquire_force_and_release() -> None: + c = FakeHasuraClient() + owner = ProjectOwner(sub="u1", email="u1@example.com") + project = create_project(c, owner=owner, name="Lock Project") + + lock1 = acquire_project_lock( + c, + project_id=project.project_id, + user_sub="u1", + user_email="u1@example.com", + ) + assert lock1 is not None + assert lock1.locked_by_sub == "u1" + assert is_project_lock_holder(c, project_id=project.project_id, user_sub="u1") + + lock2 = acquire_project_lock( + c, + project_id=project.project_id, + user_sub="u2", + user_email="u2@example.com", + ) + assert lock2 is None + assert not is_project_lock_holder(c, project_id=project.project_id, user_sub="u2") + + claimed = acquire_project_lock( + c, + project_id=project.project_id, + user_sub="u2", + user_email="u2@example.com", + force=True, + ) + assert claimed is not None + assert claimed.locked_by_sub == "u2" + assert is_project_lock_holder(c, project_id=project.project_id, user_sub="u2") + + # Releasing by non-holder is a no-op. + release_project_lock(c, project_id=project.project_id, user_sub="u1") + still_locked = get_project_lock(c, project_id=project.project_id) + assert still_locked is not None + assert still_locked.locked_by_sub == "u2" + + release_project_lock(c, project_id=project.project_id, user_sub="u2") + assert get_project_lock(c, project_id=project.project_id) is None diff --git a/tests/test_ws_member_api.py b/tests/test_ws_member_api.py new file mode 100644 index 0000000..d3ee1bc --- /dev/null +++ b/tests/test_ws_member_api.py @@ -0,0 +1,139 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from src.projects.store import ProjectMember + + +@dataclass +class _FakeProject: + project_id: str + owner_sub: str + owner_email: str + created_at: str = "t0" + + +def test_api_project_members_list_includes_owner(monkeypatch) -> None: + from fastapi.testclient import TestClient + + import src.runtimes.ws_server as ws_server + + project = _FakeProject( + project_id="p1", + owner_sub="owner-sub", + owner_email="owner@example.com", + ) + + monkeypatch.setattr(ws_server, "_require_hasura", lambda: None) + monkeypatch.setattr( + ws_server, + "_get_owner_from_request", + lambda _request: ("owner-sub", "owner@example.com"), + ) + + import src.db.provisioning as provisioning + import src.projects.store as projects_store + + monkeypatch.setattr(provisioning, "hasura_client_from_env", lambda: object()) + monkeypatch.setattr(projects_store, "get_project_by_id", lambda *_a, **_k: project) + monkeypatch.setattr( + projects_store, + "list_project_members", + lambda *_a, **_k: [ + ProjectMember( + project_id="p1", + user_sub="editor-sub", + user_email="editor@example.com", + added_at="t0", + added_by_sub="owner-sub", + ) + ], + ) + + client = TestClient(ws_server.app) + res = client.get("/api/projects/p1/members") + assert res.status_code == 200 + body = res.json() + assert body["is_owner"] is True + assert len(body["members"]) == 2 + assert body["members"][0]["role"] == "owner" + assert body["members"][0]["email"] == "owner@example.com" + assert body["members"][1]["role"] == "editor" + + +def test_api_project_members_add(monkeypatch) -> None: + from fastapi.testclient import TestClient + + import src.runtimes.ws_server as ws_server + + project = _FakeProject( + project_id="p1", + owner_sub="owner-sub", + owner_email="owner@example.com", + ) + + monkeypatch.setattr(ws_server, "_require_hasura", lambda: None) + + async def _owner_access(_request, *, project_id: str): + _ = project_id + return project + + monkeypatch.setattr(ws_server, "_ensure_project_owner_access_async", _owner_access) + monkeypatch.setattr( + ws_server, + "_get_actor_from_request", + lambda _request: { + "sub": "owner-sub", + "email": "owner@example.com", + "name": "Owner", + }, + ) + + import src.db.provisioning as provisioning + import src.projects.store as projects_store + + monkeypatch.setattr(provisioning, "hasura_client_from_env", lambda: object()) + monkeypatch.setattr( + projects_store, + "add_project_member", + lambda *_a, **_k: ProjectMember( + project_id="p1", + user_sub=None, + user_email="new@example.com", + added_at="t1", + added_by_sub="owner-sub", + ), + ) + + client = TestClient(ws_server.app) + res = client.post("/api/projects/p1/members", json={"email": "NEW@example.com"}) + assert res.status_code == 200 + member = res.json()["member"] + assert member["email"] == "new@example.com" + assert member["role"] == "editor" + assert member["pending"] is True + + +def test_api_project_members_remove_owner_guard(monkeypatch) -> None: + from fastapi.testclient import TestClient + + import src.runtimes.ws_server as ws_server + + project = _FakeProject( + project_id="p1", + owner_sub="owner-sub", + owner_email="owner@example.com", + ) + + monkeypatch.setattr(ws_server, "_require_hasura", lambda: None) + + async def _owner_access(_request, *, project_id: str): + _ = project_id + return project + + monkeypatch.setattr(ws_server, "_ensure_project_owner_access_async", _owner_access) + + client = TestClient(ws_server.app) + res = client.delete("/api/projects/p1/members/by-email/owner@example.com") + assert res.status_code == 400 + assert res.json()["error"] == "cannot_remove_owner" diff --git a/tests/test_ws_project_locking.py b/tests/test_ws_project_locking.py new file mode 100644 index 0000000..3fe7b9d --- /dev/null +++ b/tests/test_ws_project_locking.py @@ -0,0 +1,196 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from src.projects.store import ProjectLock + + +@dataclass +class _FakeProject: + project_id: str + slug: str + template_id: str + owner_sub: str = "owner-sub" + owner_email: str = "owner@example.com" + + +class _FakeSessionManager: + def get_backend(self, _session_id: str): + return object() + + +class _FakeAgent: + def __init__(self) -> None: + self.session_data: dict[str, dict] = {} + self._session_manager = _FakeSessionManager() + + async def init( + self, + session_id: str, + template_id: str | None = None, + slug: str | None = None, + ) -> bool: + _ = template_id, slug + self.session_data.setdefault(session_id, {}) + return True + + async def ensure_ws_chat_history_ready(self, _session_id: str): + return [] + + async def restore_pending_hitl_from_checkpoint(self, _session_id: str): + return None + + def get_pending_hitl(self, _session_id: str): + return None + + async def emit_session_start_hook(self, *, session_id: str): + _ = session_id + return {"called": False} + + def set_session_controls(self, _session_id: str, **_kwargs): + return None + + +def _setup_ws_lock_mocks(monkeypatch, *, actors: list[dict[str, str]]): + import src.runtimes.ws_server as ws_server + + project = _FakeProject(project_id="p1", slug="my-proj", template_id="vite") + lock_state: dict[str, ProjectLock | None] = {"lock": None} + + monkeypatch.setattr(ws_server, "_agent", _FakeAgent()) + monkeypatch.setattr(ws_server, "_require_hasura", lambda: None) + monkeypatch.setattr(ws_server, "_require_auth", lambda _ws: None) + + def _next_actor(_ws): + assert actors, "actor list exhausted" + return actors.pop(0) + + monkeypatch.setattr(ws_server, "_get_actor_from_ws", _next_actor) + ws_server._cleanup_project_runtime_state("p1") + + import src.db.provisioning as provisioning + import src.gitlab.config as gitlab_config + import src.gitlab.integration as gitlab_integration + import src.projects.store as projects_store + + monkeypatch.setattr(provisioning, "hasura_client_from_env", lambda: object()) + monkeypatch.setattr(gitlab_config, "git_sync_enabled", lambda: False) + monkeypatch.setattr( + gitlab_integration, + "ensure_gitlab_repo_for_project", + lambda *_a, **_k: (project, {}), + ) + monkeypatch.setattr(projects_store, "get_project_by_id", lambda *_a, **_k: project) + monkeypatch.setattr( + projects_store, + "get_project_lock", + lambda *_a, **_k: lock_state["lock"], + ) + + def _acquire_lock( + _client, + *, + project_id: str, + user_sub: str, + user_email: str, + force: bool = False, + ): + _ = project_id + current = lock_state["lock"] + if not force and current is not None and current.locked_by_sub != user_sub: + return None + new_lock = ProjectLock( + project_id="p1", + locked_by_sub=user_sub, + locked_by_email=user_email, + locked_at="2026-02-16T10:00:00Z", + ) + lock_state["lock"] = new_lock + return new_lock + + def _release_lock(_client, *, project_id: str, user_sub: str): + _ = project_id + current = lock_state["lock"] + if current is not None and current.locked_by_sub == user_sub: + lock_state["lock"] = None + + monkeypatch.setattr(projects_store, "acquire_project_lock", _acquire_lock) + monkeypatch.setattr(projects_store, "release_project_lock", _release_lock) + return ws_server, lock_state + + +def test_ws_init_rejects_when_project_locked(monkeypatch) -> None: + from fastapi.testclient import TestClient + + ws_server, lock_state = _setup_ws_lock_mocks( + monkeypatch, + actors=[ + {"sub": "u1", "email": "u1@example.com", "name": "User One"}, + {"sub": "u2", "email": "u2@example.com", "name": "User Two"}, + ], + ) + + client = TestClient(ws_server.app) + with client.websocket_connect("/ws") as ws1: + ws1.send_json({"type": "init", "data": {"session_id": "p1"}}) + init1 = ws1.receive_json() + assert init1["type"] == "init" + + with client.websocket_connect("/ws") as ws2: + ws2.send_json({"type": "init", "data": {"session_id": "p1"}}) + err = ws2.receive_json() + assert err["type"] == "error" + assert err["data"]["code"] == "project_locked" + assert err["data"]["locked_by"]["email"] == "u1@example.com" + + assert lock_state["lock"] is None + + +def test_ws_force_claim_notifies_previous_holder(monkeypatch) -> None: + from fastapi.testclient import TestClient + + ws_server, lock_state = _setup_ws_lock_mocks( + monkeypatch, + actors=[{"sub": "u2", "email": "u2@example.com", "name": "User Two"}], + ) + lock_state["lock"] = ProjectLock( + project_id="p1", + locked_by_sub="u1", + locked_by_email="u1@example.com", + locked_at="2026-02-16T09:59:00Z", + ) + + class _HolderWS: + def __init__(self): + self.messages: list[dict] = [] + self.closed = False + + async def send_json(self, payload: dict): + self.messages.append(payload) + + async def close(self, code: int = 1000): + _ = code + self.closed = True + + holder_ws = _HolderWS() + + async def _fake_get_holder(_project_id: str): + return {"ws": holder_ws, "user_sub": "u1", "user_email": "u1@example.com"} + + monkeypatch.setattr(ws_server, "_get_ws_lock_holder", _fake_get_holder) + + client = TestClient(ws_server.app) + with client.websocket_connect("/ws") as ws2: + ws2.send_json( + {"type": "init", "data": {"session_id": "p1", "force_claim": True}} + ) + init2 = ws2.receive_json() + assert init2["type"] == "init" + + assert holder_ws.messages + claim_payload = holder_ws.messages[0] + assert claim_payload["type"] == "session_claimed" + assert claim_payload["data"]["claimed_by"]["email"] == "u2@example.com" + assert holder_ws.closed is True + + assert lock_state["lock"] is None