Can you perform an investigation on the capnweb library, as found implemented in @packages/opencode/src/share/share-next.ts , and see how capnweb's rpc mechanism is able to handle some of these session persistence issues we are considering here?
So instead of the Rest HTTP interface, we could use this capnweb transport in the design of this session persistence. Would it be better to just use the capnweb as a transport layer alternative within the a2a protocol?
Called the Read tool with the following input: {"filePath":"/home/jm/data/code/opencode/packages/opencode/src/share/share-next.ts"}
/home/jm/data/code/opencode/packages/opencode/src/share/share-next.ts
file
1: import { Bus } from "@/bus"
2: import { Config } from "@/config/config"
3: import { newHttpBatchRpcSession } from "capnweb"
4: import { ulid } from "ulid"
5: import { Provider } from "@/provider/provider"
6: import { Session } from "@/session"
7: import { MessageV2 } from "@/session/message-v2"
8: import { Database, eq } from "@/storage/db"
9: import { SessionShareTable } from "./share.sql"
10: import { Log } from "@/util/log"
11: import type * as SDK from "@opencode-ai/sdk/v2"
12: import type { ShareRpc, SyncData, SyncInfo } from "./rpc-contract"
13:
14: export namespace ShareNext {
15: const log = Log.create({ service: "share-next" })
16:
17: export async function url() {
18: return Config.get().then((x) => x.enterprise?.url ?? "https://opencode.j9xym.com")
19: }
20:
21: const disabled = process.env["OPENCODE_DISABLE_SHARE"] === "true" || process.env["OPENCODE_DISABLE_SHARE"] === "1"
22: const transport = process.env["OPENCODE_SHARE_TRANSPORT"] === "rpc" ? "rpc" : "http"
23: const rpcKey = process.env["OPENCODE_SHARE_RPC_KEY"]
24:
25: function rpcHeaders() {
26: if (!rpcKey) return undefined
27: return { "x-opencode-share-key": rpcKey }
28: }
29:
30: export async function init() {
31: if (disabled) return
32: Bus.subscribe(Session.Event.Updated, async (evt) => {
33: await sync(evt.properties.info.id, [
34: {
35: type: "session",
36: data: evt.properties.info,
37: },
38: ])
39: })
40: Bus.subscribe(MessageV2.Event.Updated, async (evt) => {
41: await sync(evt.properties.info.sessionID, [
42: {
43: type: "message",
44: data: evt.properties.info,
45: },
46: ])
47: if (evt.properties.info.role === "user") {
48: await sync(evt.properties.info.sessionID, [
49: {
50: type: "model",
51: data: [
52: await Provider.getModel(evt.properties.info.model.providerID, evt.properties.info.model.modelID).then(
53: (m) => m,
54: ),
55: ],
56: },
57: ])
58: }
59: })
60: Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
61: await sync(evt.properties.part.sessionID, [
62: {
63: type: "part",
64: data: evt.properties.part,
65: },
66: ])
67: })
68: Bus.subscribe(Session.Event.Diff, async (evt) => {
69: await sync(evt.properties.sessionID, [
70: {
71: type: "session_diff",
72: data: evt.properties.diff,
73: },
74: ])
75: })
76: }
77:
78: export async function create(sessionID: string) {
79: if (disabled) return { id: "", url: "", secret: "" }
80: log.info("creating share", { sessionID })
81: const result =
82: transport === "rpc"
83: ? await newHttpBatchRpcSession(
84: new Request(${await url()}/rpc/share, {
85: headers: rpcHeaders(),
86: }),
87: ).createShare(sessionID)
88: : await fetch(${await url()}/api/share, {
89: method: "POST",
90: headers: {
91: "Content-Type": "application/json",
92: },
93: body: JSON.stringify({ sessionID: sessionID }),
94: })
95: .then((x) => x.json())
96: .then((x) => x as SyncInfo)
97: Database.use((db) =>
98: db
99: .insert(SessionShareTable)
100: .values({ session_id: sessionID, id: result.id, secret: result.secret, url: result.url })
101: .onConflictDoUpdate({
102: target: SessionShareTable.session_id,
103: set: { id: result.id, secret: result.secret, url: result.url },
104: })
105: .run(),
106: )
107: fullSync(sessionID)
108: return result
109: }
110:
111: function get(sessionID: string) {
112: const row = Database.use((db) =>
113: db.select().from(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).get(),
114: )
115: if (!row) return
116: return { id: row.id, secret: row.secret, url: row.url }
117: }
118:
119: type Data = SyncData
120:
121: const queue = new Map<string, { timeout: NodeJS.Timeout; data: Map<string, Data> }>()
122: async function sync(sessionID: string, data: Data[]) {
123: if (disabled) return
124: const existing = queue.get(sessionID)
125: if (existing) {
126: for (const item of data) {
127: existing.data.set("id" in item ? (item.id as string) : ulid(), item)
128: }
129: return
130: }
131:
132: const dataMap = new Map<string, Data>()
133: for (const item of data) {
134: dataMap.set("id" in item ? (item.id as string) : ulid(), item)
135: }
136:
137: const timeout = setTimeout(async () => {
138: const queued = queue.get(sessionID)
139: if (!queued) return
140: queue.delete(sessionID)
141: const share = get(sessionID)
142: if (!share) return
143:
144: if (transport === "rpc") {
145: await newHttpBatchRpcSession(
146: new Request(${await url()}/rpc/share, {
147: headers: rpcHeaders(),
148: }),
149: ).syncShare(share.id, share.secret, Array.from(queued.data.values()))
150: return
151: }
152:
153: await fetch(${await url()}/api/share/${share.id}/sync, {
154: method: "POST",
155: headers: {
156: "Content-Type": "application/json",
157: },
158: body: JSON.stringify({
159: secret: share.secret,
160: data: Array.from(queued.data.values()),
161: }),
162: })
163: }, 1000)
164: queue.set(sessionID, { timeout, data: dataMap })
165: }
166:
167: export async function remove(sessionID: string) {
168: if (disabled) return
169: log.info("removing share", { sessionID })
170: const share = get(sessionID)
171: if (!share) return
172: await fetch(${await url()}/api/share/${share.id}, {
173: method: "DELETE",
174: headers: {
175: "Content-Type": "application/json",
176: },
177: body: JSON.stringify({
178: secret: share.secret,
179: }),
180: })
181: Database.use((db) => db.delete(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).run())
182: }
183:
184: async function fullSync(sessionID: string) {
185: log.info("full sync", { sessionID })
186: const session = await Session.get(sessionID)
187: const diffs = await Session.diff(sessionID)
188: const messages = await Array.fromAsync(MessageV2.stream(sessionID))
189: const models = await Promise.all(
190: messages
191: .filter((m) => m.info.role === "user")
192: .map((m) => (m.info as SDK.UserMessage).model)
193: .map((m) => Provider.getModel(m.providerID, m.modelID).then((m) => m)),
194: )
195: await sync(sessionID, [
196: {
197: type: "session",
198: data: session,
199: },
200: ...messages.map((x) => ({
201: type: "message" as const,
202: data: x.info,
203: })),
204: ...messages.flatMap((x) => x.parts.map((y) => ({ type: "part" as const, data: y }))),
205: {
206: type: "session_diff",
207: data: diffs,
208: },
209: {
210: type: "model",
211: data: models,
212: },
213: ])
214: }
215: }
216:
(End of file - total 216 lines)
Instructions from: /home/jm/data/code/opencode/packages/opencode/AGENTS.md
# opencode database guide
Database
- Schema: Drizzle schema lives in
src/**/*.sql.ts.
- Naming: tables and columns use snake*case; join columns are
<entity>_id; indexes are <table>*<column>\_idx.
- Migrations: generated by Drizzle Kit using
drizzle.config.ts (schema: ./src/**/*.sql.ts, output: ./migration).
- Command:
bun run db generate --name <slug>.
- Output: creates
migration/<timestamp>_<slug>/migration.sql and snapshot.json.
- Tests: migration tests should read the per-folder layout (no
_journal.json).