-
Notifications
You must be signed in to change notification settings - Fork 433
Feat/chunk ws payload #4621
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Feat/chunk ws payload #4621
Conversation
…dispatching in WSChannel
Walkthrough此次变更集中在连接子系统与编辑器:引入基于长度字段的分帧解码器回调 API、异步批处理与单飞保障;出站加入分片/队列化发送并将 send 改为返回 Promise;缓冲区新增 4 字节共享读取;配套调整多个驱动与大量测试;新增编辑器大文件优化工具与接入点;并包含若干小修与文档注释调整。 Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor App as 应用
participant WS as WSWebSocketConnection
participant Q as SendQueue
participant Sock as WebSocket
participant Dec as LengthFieldBasedFrameDecoder
Note over WS,Q: 出站:队列化 + 分片发送(chunkSize)
App->>WS: send(data) : Promise<void>
WS->>Q: enqueue(data)
alt processor 未运行
WS->>WS: processSendQueue()
end
loop 队列非空
WS->>WS: framed = construct(data).dumpAndOwn()
loop 每个 chunk (chunkSize)
WS->>Sock: send(chunk)
Sock-->>WS: ack / error
end
WS->>WS: framed.dispose()
WS-->>App: resolve/reject
end
Note over Sock,Dec: 入站:arraybuffer -> decoder -> onData
Sock-->>WS: message(ArrayBuffer)
WS->>Dec: push(bytes)
Dec-->>WS: onData(frame) (异步回调)
WS-->>App: forward frame
sequenceDiagram
participant Src as 数据源(分块)
participant Dec as FrameDecoder
participant Cur as Cursor/Buffers
participant Lsn as onData Listener
Note over Dec: 解码器:单飞 + 异步批处理
Src->>Dec: push(chunk)
Dec->>Dec: schedule processBuffers()
loop 最多若干次或至数据耗尽
Dec->>Cur: read4() -> length
alt 不足以构成完整帧
Dec->>Dec: break 等待更多数据
else 有完整帧
Dec->>Dec: splice(consume)
Dec-->>Lsn: onData(payload) (try/catch 异常隔离)
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~70 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
🧹 Nitpick comments (22)
packages/components/src/recycle-tree/tree/TreeNode.ts (1)
42-50
: 建议:评估是否需要支持负索引或保持现有行为
经查,
spliceArray
仅在以下内部位置被调用,且传入的start
/deleteCount
均来自已计算的非负值,当前并不存在负索引场景:
- packages/components/src/recycle-tree/tree/TreeNode.ts(41 行定义,1194、1238、1325、1357 行调用)
- packages/components/src/recycle-tree/RecycleTree.tsx(866、870 行调用)
如果
spliceArray
仅作为内部工具,且无对外文档或 API 暴露风险,可暂时保持现有实现,无需对负索引做额外归一化处理。若未来有计划将
spliceArray
作为公共 API 暴露,或者希望行为与Array.prototype.splice
保持完全一致,建议按原 review 中示例对start
与deleteCount
进行归一化,并新增以下两类单元测试:
- 负索引删除(如
arr=[1,2,3]
,start=-1, deleteCount=1
)- 删除越界(如
deleteCount
超过剩余长度)可选优化:
- 在函数注释中明确支持/不支持的索引范围;
- 若不介意轻微额外分配,可统一返回新数组(即使无操作,也使用
arr.slice(0)
),以消除“原数组引用被共享”的潜在顾虑。packages/connection/__test__/common/buffers.test.ts (5)
311-315
: 测试标题与实际数据规模不一致,建议修正用词避免误导此处构造的是约 1MB 的数据切片(1KB * 1024 次),而用例标题写成了 “1GB”。建议仅修正文案,避免引入超大内存压力。
应用如下变更以修正标题:
- it('should handle 1GB data slicing', () => { + it('should handle 1MB data slicing', () => {
329-340
: 关于 slice4 返回共享缓冲区语义的测试提示根据实现,slice4 返回的是复用的临时 4 字节缓冲区(非新分配)。当前断言基于“值相等”是合理的,但请在测试或注释中标明:不得持久保存或修改该返回值;如需持久化,应先 copy。否则容易引入后续用例之间的隐式耦合。
384-391
: 性能阈值过于激进,CI 环境下存在抖动风险1MB 连续切片在资源受限/共享的 CI 上偶发超过 50ms。建议放宽阈值或基于统计中位数评估,降低误报。
可先小幅放宽阈值:
- expect(duration).toBeLessThan(50); + expect(duration).toBeLessThanOrEqual(150);另外,TypeScript 环境下如无 DOM lib 支持,建议显式引入 Node 的 performance:
// 放置于文件顶部(支持性修改) import { performance } from 'perf_hooks';
393-406
: 大规模 splice 的时间断言同样偏紧,建议加预热或放宽到 2s1000 次 splice 在不同 Node 版本/CPU 下波动较大,1s 可能导致偶发失败。建议放宽并可加一次空跑预热以稳定 JIT。
建议先放宽阈值:
- expect(duration).toBeLessThan(1000); + expect(duration).toBeLessThanOrEqual(2000);如需进一步稳定,可在计时前先执行 50 次无关紧要的 splice 作为预热(支持性修改,可单独补充)。
420-431
: createEnhanced 与 create 职责重叠,可合并/精简实现并避免命名误导
- 逻辑与 create 基本相同,仅跳过 0 长度切片。可通过在 create 增加过滤或在调用侧确保 split>0 来避免重复代码。
- 回调参数名 chunkSize 容易与全局常量 chunkSize(1MB)语义混淆,建议更名为 len。
可以先做小改名,降低误读:
-function createEnhanced(xs: number[], split: number[]): Buffers { +function createEnhanced(xs: number[], split: number[]): Buffers { const bufs = new Buffers(); let offset = 0; - split.forEach((chunkSize) => { - if (chunkSize > 0) { - const chunk = new Uint8Array(xs.slice(offset, offset + chunkSize)); + split.forEach((len) => { + if (len > 0) { + const chunk = new Uint8Array(xs.slice(offset, offset + len)); bufs.push(chunk); - offset += chunkSize; + offset += len; } }); return bufs; }进一步的合并可考虑让 create 接受 { skipZero?: boolean } 选项。
packages/editor/src/browser/doc-model/large-file-optimizer.ts (2)
22-44
: 部分 Monaco 选项类型/取值可疑,请校验版本并调整为可靠取值
- occurrencesHighlight 通常为 boolean,设为 'off' 可能与类型不符;建议改为 false。
- showFoldingControls 常见取值为 'always' | 'mouseover'(是否包含 'never' 需确认所用 Monaco 版本),且已将 folding=false,保留该项并设 'never' 也无效益,建议改为 'mouseover' 或直接移除。
- automaticLayout 强制为 false 可能影响容器尺寸变化下的可用性,是否应交由调用方或由检测逻辑控制,建议再评估。
建议最小改动如下:
optimizedOptions.automaticLayout = false; optimizedOptions.renderLineHighlight = 'none'; optimizedOptions.folding = false; - optimizedOptions.showFoldingControls = 'never'; + optimizedOptions.showFoldingControls = 'mouseover'; @@ - optimizedOptions.occurrencesHighlight = 'off'; + optimizedOptions.occurrencesHighlight = false;如确认需要维持隐藏折叠控件,也可仅依赖
folding=false
。
77-90
: 形参命名与实际用法不符(“字节数”vs“字符数”),建议澄清语义以避免误导调用方(base-editor-wrapper)传入的为
content.length
(字符/码元数),而此处命名为fileSizeBytes
,易让后续维护产生误判。建议仅更名以对齐现状(不改变行为),并在注释中注明该值可为“估算的字节或字符长度”。-export function shouldOptimizeForLargeFile(fileSizeBytes: number, content?: string): boolean { +export function shouldOptimizeForLargeFile(sizeOrLength: number, content?: string): boolean { const SIZE_THRESHOLD = 10 * 1024 * 1024; // 10MB const LINE_THRESHOLD = 50000; // 50k 行 - if (fileSizeBytes > SIZE_THRESHOLD) { + if (sizeOrLength > SIZE_THRESHOLD) { return true; }如果后续能从模型获取精确字节数/行数(见下一个文件的建议),再考虑引入更精确的判定指标。
packages/connection/src/common/connection/drivers/stream.ts (1)
1-1
: 尽量统一日志通道已通过 eslint-disable 放开 console 使用,但建议与代码库其他模块一致地接入统一 Logger(如 @opensumi/ide-core-common 的 logger),便于收敛日志与调整级别。
packages/connection/__test__/node/index.test.ts (1)
65-66
: 更新 wrapSerializer 实例复用建议WSChannel 本身并不会在构造阶段自动订阅传入的
connection.onMessage
,因此创建两个wrapSerializer
实例不会导致双重订阅或重复分发。但在测试里对同一底层连接重复包装会增加认知成本,建议复用同一个包装实例以提升可读性和一致性。关键点:
- 只调用一次
wrapSerializer(wsConnection, furySerializer)
,并将结果同时用于构建WSChannel
与后续手动拦截server-ready
事件。- 保证测试中针对
server-ready
的拦截逻辑继续在同一包装实例上生效(可在channel.dispatch
后直接通过该实例的onMessage
监听)。推荐可选 diff:
- const wsConnection = new WSWebSocketConnection(connection); - const wrappedConnection = wrapSerializer(wsConnection, furySerializer); - const channel = new WSChannel(wrapSerializer(wsConnection, furySerializer), { + const wsConnection = new WSWebSocketConnection(connection); + const wrappedConnection = wrapSerializer(wsConnection, furySerializer); + const channel = new WSChannel(wrappedConnection, { id: 'TEST_CHANNEL_ID', });packages/connection/__test__/browser/index.test.ts (1)
24-35
: send 返回 Promise,建议在测试侧 await 以提升确定性WSWebSocketConnection.send 现为异步队列发送。当前不 await 也可工作,但在高负载或慢环境下可能引入偶发时序不稳定。建议在服务端模拟回复时 await:
- connection.send( + await connection.send( furySerializer.serialize({ id: msgObj.id, kind: 'server-ready', traceId: '', }), );packages/connection/src/common/connection/drivers/ws-websocket.ts (2)
27-30
: 入站数据类型兼容性提示ws 的 message 事件 data 可能为 Buffer、ArrayBuffer、Uint8Array 或字符串。当前假设 Buffer,可选增强:若为字符串则忽略或显式转为 Uint8Array,以提高鲁棒性(非必须)。
- this.socket.on('message', this.onSocketMessage); + this.socket.on('message', (data) => { + if (typeof data === 'string') return; // or: this.decoder.push(Buffer.from(data)) + this.onSocketMessage(data as Buffer); + });
82-96
: 队列大小仅按条目数限制,建议引入按字节的挂起大小上限已维护 pendingSize,但未用于流控。为避免大包挤满内存,建议增加 MAX_PENDING_BYTES 等阈值(例如 64MB),当超过上限时直接拒绝或暂停入队。
packages/connection/__test__/common/frame-decoder.test.ts (3)
117-129
: 测试名称与语义不符的小问题用例名“has no valid length info”与实际构造(包含长度字段的帧)不符。建议改名以免误导,如 “can decode a single framed payload”。
156-177
: 1000 小包压力用例建议事件驱动收敛,降低 flakiness当前通过 setTimeout(1000) 等待,存在 CI 慢机不稳定可能。建议基于计数到达 1000 时 resolve,再断言:
- // 等待处理完成 - await new Promise((resolve) => setTimeout(resolve, 1000)); - expect(received.length).toBe(1000); + await new Promise<void>((resolve) => { + const target = 1000; + decoder.onData(() => { + if (received.length >= target) resolve(); + }); + }); + expect(received.length).toBeGreaterThanOrEqual(1000);
180-199
: 内存稳定性用例:阈值与运行环境相关,建议保留一定裕量并标注慢测此类用例在不同 CI 机型可能波动,当前 5MB 阈值较紧。可加上注释或环境开关(如仅在长测/本地运行),或适度放宽阈值。
packages/connection/src/common/connection/drivers/reconnecting-websocket.ts (3)
20-28
: 构造函数对 blob 的处理过于激进,建议在内部统一强制为 arraybuffer 而非直接抛错目前仅在 binaryType==='arraybuffer' 时绑定监听,否则 binaryType==='blob' 直接抛错。既然 forURL 已经设置为 arraybuffer,更稳妥的做法是在构造函数里再次兜底设置 binaryType,避免调用方未来扩展或子类化时误传导致的运行时异常。
可在构造里主动设置,并兼容已有逻辑:
protected constructor(private socket: ReconnectingWebSocket) { super(); - - if (socket.binaryType === 'arraybuffer') { + // 兜底强制为 arraybuffer,避免运行时差异 + this.socket.binaryType = 'arraybuffer'; + if (this.socket.binaryType === 'arraybuffer') { this.socket.addEventListener('message', this.arrayBufferHandler); } else if (socket.binaryType === 'blob') { throw new Error('blob is not implemented'); } }
53-55
: catch 块的 error 类型未收窄,reject 签名要求 Error,存在类型不匹配风险在开启
useUnknownInCatchVariables
时,catch 的 error 为 unknown。建议收窄为 Error 或包装成 Error。- console.error('[ReconnectingWebSocket] Error sending data:', error); - reject(error); + console.error('[ReconnectingWebSocket] Error sending data:', error); + reject(error instanceof Error ? error : new Error(String(error)));
86-88
: onMessage 改为基于 decoder 的回调绑定本身是好改动,但需确保多订阅场景的策略当前
onData
仅支持单 listener(后注册覆盖前者)。如上层存在多处订阅需求,需在文档中明确或在 decoder 中支持多播。是否需要多播?如果需要,我可以帮你改造 onData 为支持多 listener 的微型 emitter。
packages/connection/src/common/connection/drivers/frame-decoder.ts (3)
116-122
: 不必 await 下游 listener,可改为 fire-and-forget 提升解码吞吐当前对
this._onDataListener
的调用使用await Promise.resolve().then(...)
,导致后续帧解码被 listener 的异步链路节流。若业务允许 listener 异步处理,请不要阻塞解码管线。- if (this._onDataListener) { - try { - await Promise.resolve().then(() => this._onDataListener?.(binary)); - } catch (error) { - console.error('[Frame Decoder] Error in data listener:', error); - } - } + if (this._onDataListener) { + try { + // 异步触发但不阻塞解码主循环 + Promise.resolve().then(() => this._onDataListener?.(binary)).catch((error) => { + console.error('[Frame Decoder] Error in data listener:', error); + }); + } catch (error) { + console.error('[Frame Decoder] Error in data listener (sync):', error); + } + }如必须保持“逐帧严格顺序 + 不并发”,可忽略本建议。
210-214
: dispose 的顺序与语义 OK,但可补充注释确保后续维护者理解 Cursor/Buffer 生命周期当前先
buffers.dispose()
再reset()
(内部会cursor.reset()
)是安全的,但容易引起误解。建议加一句注释说明 reset 仅清游标状态、不会访问已释放的底层缓冲。
216-229
: construct 每次创建新 writer 的思路正确;建议在错误信息中携带 content.length 便于定位异常数据小优化建议,以便线上排查异常帧。
- } catch (error) { - console.warn('[Frame Decoder] Error constructing frame:', error); + } catch (error) { + console.warn('[Frame Decoder] Error constructing frame:', error, 'contentLength=', content?.byteLength); throw error; }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (18)
packages/components/src/recycle-tree/tree/TreeNode.ts
(2 hunks)packages/connection/__test__/browser/index.test.ts
(2 hunks)packages/connection/__test__/common/buffers.test.ts
(2 hunks)packages/connection/__test__/common/frame-decoder.test.ts
(4 hunks)packages/connection/__test__/node/index.test.ts
(2 hunks)packages/connection/__test__/node/ws-channel.test.ts
(3 hunks)packages/connection/src/common/buffers/buffers.ts
(3 hunks)packages/connection/src/common/connection/drivers/frame-decoder.ts
(6 hunks)packages/connection/src/common/connection/drivers/reconnecting-websocket.ts
(3 hunks)packages/connection/src/common/connection/drivers/stream.ts
(2 hunks)packages/connection/src/common/connection/drivers/ws-websocket.ts
(2 hunks)packages/connection/src/common/constants.ts
(1 hunks)packages/connection/src/common/fury-extends/one-of.ts
(1 hunks)packages/connection/src/node/common-channel-handler.ts
(1 hunks)packages/core-browser/__tests__/bootstrap/connection.test.ts
(2 hunks)packages/editor/src/browser/base-editor-wrapper.ts
(2 hunks)packages/editor/src/browser/doc-model/large-file-optimizer.ts
(1 hunks)packages/file-scheme/src/browser/file-scheme.contribution.ts
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (12)
packages/connection/src/node/common-channel-handler.ts (1)
packages/connection/src/common/connection/drivers/ws-websocket.ts (1)
WSWebSocketConnection
(17-124)
packages/connection/src/common/connection/drivers/stream.ts (1)
packages/connection/src/common/connection/drivers/frame-decoder.ts (1)
LengthFieldBasedFrameDecoder
(26-230)
packages/connection/__test__/browser/index.test.ts (1)
packages/connection/src/common/connection/drivers/ws-websocket.ts (1)
WSWebSocketConnection
(17-124)
packages/editor/src/browser/base-editor-wrapper.ts (1)
packages/editor/src/browser/doc-model/large-file-optimizer.ts (2)
shouldOptimizeForLargeFile
(77-90)getLargeFileOptimizedEditorOptions
(16-72)
packages/connection/__test__/common/buffers.test.ts (2)
packages/connection/src/common/buffers/buffers.ts (3)
Buffers
(20-230)slice
(38-74)cursor
(222-224)packages/connection/src/common/constants.ts (1)
chunkSize
(6-6)
packages/editor/src/browser/doc-model/large-file-optimizer.ts (1)
packages/monaco/src/browser/monaco-api/editor.ts (1)
IEditorOptions
(132-132)
packages/connection/__test__/node/index.test.ts (4)
packages/connection/src/common/serializer/index.ts (1)
wrapSerializer
(9-24)packages/connection/src/common/serializer/fury.ts (1)
furySerializer
(67-67)packages/connection/src/common/ws-channel.ts (1)
WSChannel
(96-322)packages/connection/src/common/channel/types.ts (1)
ChannelMessage
(1-9)
packages/core-browser/__tests__/bootstrap/connection.test.ts (1)
packages/utils/src/async.ts (1)
sleep
(603-605)
packages/connection/src/common/connection/drivers/ws-websocket.ts (2)
packages/connection/src/common/connection/drivers/frame-decoder.ts (1)
LengthFieldBasedFrameDecoder
(26-230)packages/connection/src/common/constants.ts (1)
chunkSize
(6-6)
packages/connection/src/common/connection/drivers/reconnecting-websocket.ts (2)
packages/connection/src/common/connection/drivers/frame-decoder.ts (1)
LengthFieldBasedFrameDecoder
(26-230)packages/connection/src/common/constants.ts (1)
chunkSize
(6-6)
packages/connection/__test__/common/frame-decoder.test.ts (1)
packages/connection/src/common/connection/drivers/frame-decoder.ts (1)
LengthFieldBasedFrameDecoder
(26-230)
packages/connection/src/common/connection/drivers/frame-decoder.ts (1)
packages/connection/src/common/buffers/buffers.ts (1)
Buffers
(20-230)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: build (macos-latest, 20.x)
- GitHub Check: build (ubuntu-latest, 20.x)
- GitHub Check: unittest (ubuntu-latest, 18.x, node)
- GitHub Check: unittest (macos-latest, 18.x, node)
- GitHub Check: unittest (ubuntu-latest, 18.x, jsdom)
- GitHub Check: ubuntu-latest, Node.js 20.x
- GitHub Check: unittest (macos-latest, 18.x, jsdom)
🔇 Additional comments (21)
packages/components/src/recycle-tree/tree/TreeNode.ts (1)
579-583
: 文档注释微调已读,语义更清晰,无行为改动对 ensureLoaded 的注释进行了标点与用词润色,保持一致性即可。
packages/connection/src/common/constants.ts (1)
3-6
: 分片大小常量定义良好分片大小设置为1MB是合理的,既能有效减少大消息的内存压力,又不会因为分片过小而产生过多的网络开销。常量命名清晰,注释说明到位。
packages/connection/src/common/fury-extends/one-of.ts (1)
71-73
: 增强反序列化的错误处理添加默认分支处理未知索引的情况很好,这提高了代码的健壮性,避免了当接收到无效索引时的静默失败。错误消息包含具体的索引值,有助于调试。
packages/core-browser/__tests__/bootstrap/connection.test.ts (2)
3-3
: 使用标准化的sleep函数从公共库导入
sleep
函数替代手动Promise实现是一个好的改进,提高了代码的一致性和可维护性。
38-40
: 简化异步延迟逻辑使用
sleep(4000).then(...)
替代手动Promise构造使代码更加简洁易读,与导入的库函数保持一致。packages/file-scheme/src/browser/file-scheme.contribution.ts (2)
151-151
: 警告阈值设置合理50MB的警告阈值设置恰当,对于大多数文本文件来说这是一个合理的警告点,能够在性能问题出现之前提醒用户。
153-174
: 优化大文件处理逻辑的分层设计新的两层检查逻辑提供了更好的用户体验:
- 50MB警告阈值:给用户一个早期警告
- maxSize限制:强制限制真正的大文件
这种分层方法让用户在不同文件大小下有不同的处理选择,逻辑清晰且注释完善。
packages/connection/src/common/buffers/buffers.ts (3)
8-8
: 共享4字节缓冲区优化内存分配引入
buffer4Capacity
作为共享的4字节缓冲区是一个很好的优化,避免了频繁的小内存分配,特别适用于帧解码中读取长度字段的场景。
76-107
: 高效的4字节切片实现
slice4
方法的实现很好地复用了现有的切片逻辑,通过使用共享缓冲区buffer4Capacity
避免了内存分配。实现正确处理了边界情况和跨缓冲区读取。
305-309
: 便捷的4字节读取方法
read4
方法提供了一个简洁的API来读取4字节数据并自动推进游标,这对于帧解码场景非常实用。实现正确地结合了slice4
和skip
操作。packages/editor/src/browser/doc-model/large-file-optimizer.ts (1)
46-71
: 选项开关组合设计清晰,解耦良好,便于按需组合按功能维度拆分禁用项(wrap/minimap/hover/codelens/suggest 等)可复用性强,易于后续扩展。
packages/editor/src/browser/base-editor-wrapper.ts (1)
183-207
: 避免在选项计算阶段读取整份文本,改用模型指标(行数/长度)以降低大文件开销在
_calculateFinalOptions
中通过getText()
获取全文将为超大文件带来一次性大字符串分配与 GC 压力;且与“优化大文件”的目标相悖。建议改用 Monaco 模型的getLineCount()
与(若可用)getValueLength()
指标,避免构造完整字符串;同时阈值比较应基于相同度量单位。
[Suggest_essential_refactor]参考改动(保留原有语义与阈值,减少内存占用):
- if (this.currentDocumentModel && this.currentUri) { - const content = this.currentDocumentModel.getText(); - const contentLength = content.length; - - // 检查是否需要优化 - const needsOptimization = shouldOptimizeForLargeFile(contentLength, content); + if (this.currentDocumentModel && this.currentUri) { + const model = this.monacoEditor.getModel(); + // 优先使用模型指标,避免构造大字符串 + const charLength = + (model && typeof (model as any).getValueLength === 'function' + ? (model as any).getValueLength() + : this.currentDocumentModel.getText().length); + const lineCount = model?.getLineCount?.(); + + // 基于长度阈值 + 行数阈值的组合判断,避免把字符数当成“字节数”误判 + const needsOptimization = + shouldOptimizeForLargeFile(charLength) || (!!lineCount && lineCount > 50_000); @@ - const optimizedEditorOptions = getLargeFileOptimizedEditorOptions(editorOptions, { + const optimizedEditorOptions = getLargeFileOptimizedEditorOptions(editorOptions, { optimizedMode: true, disableSemanticHighlighting: true, disableWordWrap: true, disableMinimap: true, - disableHover: contentLength > 20 * 1024 * 1024, // 20MB以上禁用hover + disableHover: charLength > 20 * 1024 * 1024, // 约等于 20MB 的字符阈值 disableCodeLens: true, - disableQuickSuggestions: contentLength > 30 * 1024 * 1024, // 30MB以上禁用建议 + disableQuickSuggestions: charLength > 30 * 1024 * 1024, // 约等于 30MB 的字符阈值 });说明:
getValueLength()
的可用性需在当前 @opensumi/monaco 版本中确认;如不可用,上述代码已回退到原有getText().length
。- 可考虑在
onDidChangeModel
处缓存一次“是否大文件”的判断,减少后续_doUpdateOptions
的重复计算。packages/connection/src/node/common-channel-handler.ts (1)
45-46
: 内联创建 WSWebSocketConnection 简化了连接接入路径改动更简洁,减少了临时变量,功能等价。保持现状即可。
packages/connection/__test__/node/ws-channel.test.ts (2)
88-89
: 发送不同文本负载更贴近真实场景将 N 次发送从固定 'hello' 改为 'hello' + i,便于暴露乱序/黏包等问题,方向正确。
98-99
: 适度增加超时时间以提高稳定性将两处超时从 20s 提升到 30s 可缓解偶发慢机/资源竞争导致的误报。保持即可。
Also applies to: 152-153
packages/connection/__test__/common/frame-decoder.test.ts (5)
38-41
: 使用 .dump() 获取帧字节:与新 API 对齐,👍从构造器返回 writer 转为通过 .dump() 获取实际帧字节,做法正确,语义明确。
73-87
: 单包解码用例:异步等待 onData,👍通过 Promise 包装 onData,避免竞态,符合 decoder 的异步处理模型。
89-116
: 多帧拼接解码用例:覆盖顺序与完整性,👍将多帧拼接后一次 push,验证迭代式解析逻辑,测试粒度合理。
132-155
: 分割指示符的分块用例:覆盖边界场景,👍验证在指示符拆分与长度字段跨块时的正确拼装,价值高。
202-213
: 零长度载荷用例:覆盖边界,👍packages/connection/src/common/connection/drivers/frame-decoder.ts (1)
149-159
: 长度字段读写端序需再次确认,避免与 writer 的 uint32 编码不一致这里用
readUInt32LE
按小端解读 4 字节长度,construct
中使用writer.uint32(content.byteLength)
。需要确认 Fury 的BinaryWriter.uint32
默认是否小端;若为大端则会解码错误。我可代查最新文档,或你可在本地快速验证:构帧后直接读取长度字段 4 字节判断端序是否与读侧一致。
it('should handle concurrent pushes', async () => { | ||
const decoder = new LengthFieldBasedFrameDecoder(); | ||
const content = new Uint8Array([1, 2, 3]); | ||
const packet = LengthFieldBasedFrameDecoder.construct(content).dump(); | ||
|
||
const chunk1 = packet.subarray(0, 4); | ||
const chunk2 = packet.subarray(4); | ||
|
||
const resultPromise = new Promise<Uint8Array>((resolve) => { | ||
decoder.onData(resolve); | ||
}); | ||
|
||
// 同时推送两个chunk | ||
await Promise.all([decoder.push(chunk1), decoder.push(chunk2)]); | ||
|
||
const result = await resultPromise; | ||
fastExpectBufferEqual(result, content); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
并发 push 用例存在类型/语义问题:push 返回 void,Promise.all 无效
decoder.push 返回 void,await Promise.all([decoder.push(...), ...])
实际上不会等待任何异步行为,且在严格 TS 下会报类型错。建议直接连续调用或通过微任务模拟并发:
- // 同时推送两个chunk
- await Promise.all([decoder.push(chunk1), decoder.push(chunk2)]);
+ // 同时推送两个 chunk(无返回 Promise)
+ decoder.push(chunk1);
+ decoder.push(chunk2);
或:
+ await Promise.all([
+ Promise.resolve().then(() => decoder.push(chunk1)),
+ Promise.resolve().then(() => decoder.push(chunk2)),
+ ]);
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
it('should handle concurrent pushes', async () => { | |
const decoder = new LengthFieldBasedFrameDecoder(); | |
const content = new Uint8Array([1, 2, 3]); | |
const packet = LengthFieldBasedFrameDecoder.construct(content).dump(); | |
const chunk1 = packet.subarray(0, 4); | |
const chunk2 = packet.subarray(4); | |
const resultPromise = new Promise<Uint8Array>((resolve) => { | |
decoder.onData(resolve); | |
}); | |
// 同时推送两个chunk | |
await Promise.all([decoder.push(chunk1), decoder.push(chunk2)]); | |
const result = await resultPromise; | |
fastExpectBufferEqual(result, content); | |
}); | |
it('should handle concurrent pushes', async () => { | |
const decoder = new LengthFieldBasedFrameDecoder(); | |
const content = new Uint8Array([1, 2, 3]); | |
const packet = LengthFieldBasedFrameDecoder.construct(content).dump(); | |
const chunk1 = packet.subarray(0, 4); | |
const chunk2 = packet.subarray(4); | |
const resultPromise = new Promise<Uint8Array>((resolve) => { | |
decoder.onData(resolve); | |
}); | |
// 同时推送两个 chunk(无返回 Promise) | |
decoder.push(chunk1); | |
decoder.push(chunk2); | |
const result = await resultPromise; | |
fastExpectBufferEqual(result, content); | |
}); |
🤖 Prompt for AI Agents
In packages/connection/__test__/common/frame-decoder.test.ts around lines
215-232, the test uses await Promise.all([decoder.push(chunk1),
decoder.push(chunk2)]) but decoder.push returns void so Promise.all is
ineffective and causes a TS type error; update the test to either call
decoder.push(chunk1); decoder.push(chunk2); synchronously before awaiting the
result promise, or simulate concurrent invocation by wrapping the pushes in
microtasks like Promise.resolve().then(() => decoder.push(chunk1));
Promise.resolve().then(() => decoder.push(chunk2)); so the pushes run
"concurrently" without changing decoder.push's signature.
private async processBuffers(): Promise<void> { | ||
let iterations = 0; | ||
let hasMoreData = false; | ||
|
||
do { | ||
hasMoreData = false; | ||
while (iterations < LengthFieldBasedFrameDecoder.MAX_ITERATIONS) { | ||
if (this.buffers.byteLength === 0) { | ||
break; | ||
} | ||
|
||
const result = await this.readFrame(); | ||
if (result === true) { | ||
break; | ||
} | ||
|
||
iterations++; | ||
if (iterations % 10 === 0) { | ||
await new Promise((resolve) => setTimeout(resolve, 0)); | ||
} | ||
} | ||
|
||
// 检查剩余数据 | ||
if (this.buffers.byteLength > 0) { | ||
hasMoreData = true; | ||
// 异步继续处理,避免阻塞 | ||
await new Promise((resolve) => setTimeout(resolve, 0)); | ||
iterations = 0; // 重置迭代计数器 | ||
} | ||
} while (hasMoreData); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
在没有新数据到来且缓冲区内数据不足一帧时会“空转”循环,造成无限异步重试与事件循环抖动
processBuffers
在内层循环因 readFrame()
返回 true(数据不足)而跳出后,只要 this.buffers.byteLength > 0
就设置 hasMoreData = true
并 setTimeout(0)
继续,无新数据时将反复空转,浪费 CPU。
建议记录本轮是否有“进展”(缓冲区字节减少或游标前移)再决定是否继续:
private async processBuffers(): Promise<void> {
let iterations = 0;
let hasMoreData = false;
do {
hasMoreData = false;
+ const beforeByteLength = this.buffers.byteLength;
+ const beforeOffset = this.cursor.offset;
while (iterations < LengthFieldBasedFrameDecoder.MAX_ITERATIONS) {
if (this.buffers.byteLength === 0) {
break;
}
const result = await this.readFrame();
if (result === true) {
break;
}
iterations++;
if (iterations % 10 === 0) {
await new Promise((resolve) => setTimeout(resolve, 0));
}
}
- // 检查剩余数据
- if (this.buffers.byteLength > 0) {
+ // 检查剩余数据且本轮有推进(避免未到齐一帧时空转)
+ const progressed = this.buffers.byteLength < beforeByteLength || this.cursor.offset > beforeOffset;
+ if (this.buffers.byteLength > 0 && progressed) {
hasMoreData = true;
// 异步继续处理,避免阻塞
await new Promise((resolve) => setTimeout(resolve, 0));
iterations = 0; // 重置迭代计数器
}
} while (hasMoreData);
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
private async processBuffers(): Promise<void> { | |
let iterations = 0; | |
let hasMoreData = false; | |
do { | |
hasMoreData = false; | |
while (iterations < LengthFieldBasedFrameDecoder.MAX_ITERATIONS) { | |
if (this.buffers.byteLength === 0) { | |
break; | |
} | |
const result = await this.readFrame(); | |
if (result === true) { | |
break; | |
} | |
iterations++; | |
if (iterations % 10 === 0) { | |
await new Promise((resolve) => setTimeout(resolve, 0)); | |
} | |
} | |
// 检查剩余数据 | |
if (this.buffers.byteLength > 0) { | |
hasMoreData = true; | |
// 异步继续处理,避免阻塞 | |
await new Promise((resolve) => setTimeout(resolve, 0)); | |
iterations = 0; // 重置迭代计数器 | |
} | |
} while (hasMoreData); | |
} | |
private async processBuffers(): Promise<void> { | |
let iterations = 0; | |
let hasMoreData = false; | |
do { | |
hasMoreData = false; | |
// 记录本轮开始时的状态,用于检测是否有进展 | |
const beforeByteLength = this.buffers.byteLength; | |
const beforeOffset = this.cursor.offset; | |
while (iterations < LengthFieldBasedFrameDecoder.MAX_ITERATIONS) { | |
if (this.buffers.byteLength === 0) { | |
break; | |
} | |
const result = await this.readFrame(); | |
if (result === true) { | |
break; | |
} | |
iterations++; | |
if (iterations % 10 === 0) { | |
await new Promise((resolve) => setTimeout(resolve, 0)); | |
} | |
} | |
// 仅在仍有残余数据且本轮处理有进展时继续 | |
const progressed = | |
this.buffers.byteLength < beforeByteLength || | |
this.cursor.offset > beforeOffset; | |
if (this.buffers.byteLength > 0 && progressed) { | |
hasMoreData = true; | |
// 异步继续处理,避免阻塞 | |
await new Promise((resolve) => setTimeout(resolve, 0)); | |
iterations = 0; // 重置迭代计数器 | |
} | |
} while (hasMoreData); | |
} |
🤖 Prompt for AI Agents
In packages/connection/src/common/connection/drivers/frame-decoder.ts around
lines 64 to 94, processBuffers can busy-loop when buffers contain partial frame
data but no new bytes arrive because it always sets hasMoreData=true if
byteLength>0; modify logic to detect actual progress during a full outer
iteration (e.g., capture starting buffers.byteLength or read cursor before the
inner loop and compare after): only set hasMoreData and continue the outer loop
if progress was made (byteLength decreased or cursor advanced); if no progress,
await an explicit new-data notification (or return/break) instead of scheduling
an immediate setTimeout(0), and ensure iterations and any backoff are reset when
progress happens.
private async processSendQueue() { | ||
if (this.sending) { | ||
return; | ||
} | ||
this.sending = true; | ||
|
||
while (this.sendQueue.length > 0) { | ||
const { data, resolve, reject } = this.sendQueue[0]; | ||
let handle: { get: () => Uint8Array; dispose: () => void } | null = null; | ||
|
||
try { | ||
handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn(); | ||
const packet = handle.get(); | ||
|
||
for (let i = 0; i < packet.byteLength; i += chunkSize) { | ||
await new Promise<void>((resolve) => { | ||
const chunk = packet.subarray(i, Math.min(i + chunkSize, packet.byteLength)); | ||
this.socket.send(chunk); | ||
resolve(); | ||
}); | ||
} | ||
|
||
resolve(); | ||
} catch (error) { | ||
console.error('[ReconnectingWebSocket] Error sending data:', error); | ||
reject(error); | ||
} finally { | ||
if (handle) { | ||
handle.dispose(); | ||
} | ||
} | ||
this.sendQueue.shift(); | ||
} | ||
|
||
this.sending = false; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
❓ Verification inconclusive
发送队列存在“无效 await”与缺乏背压控制的问题,可能导致不必要的事件循环抖动与内存上涨
- for 内部的
await new Promise(resolve => { send(); resolve(); })
实际同步完成,等价于空转 await,徒增微任务切换。 - 未依据
socket.bufferedAmount
做背压控制,连续发送大包(即使分片)仍可能在浏览器缓冲区堆积,造成内存峰值和延迟增大。 - 发送前未确保连接已 OPEN,若底层未做排队,可能抛错或丢包(取决于 ReconnectingWebSocket 实现)。
建议:
- 移除“无效 await”;引入简单背压:当
bufferedAmount
超过高水位时让出事件循环直至下降。 - 在发送前等待 OPEN 或在 CLOSE 时中断当前发送并 reject。
可用大数据(>10MB)脚本在本地压测前后内存与发送耗时,确认背压生效。
应用如下 diff(含最小背压与 OPEN 等待):
private async processSendQueue() {
if (this.sending) {
return;
}
- this.sending = true;
-
- while (this.sendQueue.length > 0) {
- const { data, resolve, reject } = this.sendQueue[0];
- let handle: { get: () => Uint8Array; dispose: () => void } | null = null;
-
- try {
- handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn();
- const packet = handle.get();
-
- for (let i = 0; i < packet.byteLength; i += chunkSize) {
- await new Promise<void>((resolve) => {
- const chunk = packet.subarray(i, Math.min(i + chunkSize, packet.byteLength));
- this.socket.send(chunk);
- resolve();
- });
- }
-
- resolve();
- } catch (error) {
- console.error('[ReconnectingWebSocket] Error sending data:', error);
- reject(error);
- } finally {
- if (handle) {
- handle.dispose();
- }
- }
- this.sendQueue.shift();
- }
-
- this.sending = false;
+ this.sending = true;
+ try {
+ while (this.sendQueue.length > 0) {
+ const { data, resolve, reject } = this.sendQueue[0];
+ let handle: { get: () => Uint8Array; dispose: () => void } | null = null;
+ try {
+ // 连接未就绪时等待 OPEN,若期间关闭则中断
+ if (!this.isOpen()) {
+ await new Promise<void>((resolveOpen, rejectOpen) => {
+ const d1 = this.onOpen(() => {
+ d1.dispose();
+ d2.dispose();
+ resolveOpen();
+ });
+ const d2 = this.onceClose((code, reason) => {
+ d1.dispose();
+ d2.dispose();
+ rejectOpen(new Error(`WebSocket closed before send: code=${code}, reason=${reason ?? ''}`));
+ });
+ });
+ }
+
+ handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn();
+ const packet = handle.get();
+
+ for (let i = 0; i < packet.byteLength; i += chunkSize) {
+ const chunk = packet.subarray(i, Math.min(i + chunkSize, packet.byteLength));
+ this.socket.send(chunk);
+ // 简单背压:缓冲区过高时让出事件循环
+ while (this.socket.bufferedAmount > WS_BUFFERED_HWM) {
+ await new Promise((r) => setTimeout(r, 0));
+ }
+ }
+ resolve();
+ } catch (error) {
+ console.error('[ReconnectingWebSocket] Error sending data:', error);
+ reject(error instanceof Error ? error : new Error(String(error)));
+ } finally {
+ if (handle) {
+ handle.dispose();
+ }
+ }
+ this.sendQueue.shift();
+ }
+ } finally {
+ this.sending = false;
+ }
}
并在文件顶部添加高水位常量(或统一放 constants.ts):
+// 简单背压高水位:8MB(可视场景调优或改为从 ../../constants 导入)
+const WS_BUFFERED_HWM = 8 * 1024 * 1024;
请重构 processSendQueue
方法:移除无效 await
、添加背压控制及连接状态校验
为了避免因无意义的微任务调度导致的性能抖动,以及浏览器缓冲区无限制堆积带来的内存飙升和潜在丢包风险,建议对以下几点进行改进:
• 文件路径
packages/connection/src/common/connection/drivers/reconnecting-websocket.ts
→processSendQueue
方法起始位置(约第 30–66 行)
• 存在问题
for
循环内的相当于同步执行await new Promise<void>(resolve => { this.socket.send(chunk); resolve(); });
socket.send
后立即await
,并不会产生任何真实的异步等待,只增加了微任务切换开销。- 未对
this.socket.bufferedAmount
做背压控制,连续发送大数据(即使分片)仍会让浏览器端缓冲区无限增长,可能导致内存峰值及额外延迟。 - 发送前未确保 WebSocket 已处于
OPEN
状态,若在CONNECTING
或CLOSING/CLOSED
状态调用send
,可能出现抛错或丢包(依赖底层实现)。
• 建议改动(最小可行背压与状态等待示例)
在文件顶部新增高水位常量:
+// 简单背压高水位:8MB(可根据场景调优或统一导入 constants)
+const WS_BUFFERED_HWM = 8 * 1024 * 1024;
并将 processSendQueue
重构为:
private async processSendQueue() {
if (this.sending) {
return;
}
- this.sending = true;
-
- while (this.sendQueue.length > 0) {
- const { data, resolve, reject } = this.sendQueue[0];
- let handle: { get: () => Uint8Array; dispose: () => void } | null = null;
-
- try {
- handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn();
- const packet = handle.get();
-
- for (let i = 0; i < packet.byteLength; i += chunkSize) {
- await new Promise<void>((resolve) => {
- const chunk = packet.subarray(i, Math.min(i + chunkSize, packet.byteLength));
- this.socket.send(chunk);
- resolve();
- });
- }
-
- resolve();
- } catch (error) {
- console.error('[ReconnectingWebSocket] Error sending data:', error);
- reject(error);
- } finally {
- if (handle) {
- handle.dispose();
- }
- }
- this.sendQueue.shift();
- }
-
- this.sending = false;
+ this.sending = true;
+ try {
+ while (this.sendQueue.length > 0) {
+ const { data, resolve, reject } = this.sendQueue[0];
+ let handle: { get: () => Uint8Array; dispose: () => void } | null = null;
+ try {
+ // 等待 OPEN,如在此期间关闭则中断并 reject
+ if (!this.isOpen()) {
+ await new Promise<void>((resolveOpen, rejectOpen) => {
+ const d1 = this.onOpen(() => {
+ d1.dispose();
+ d2.dispose();
+ resolveOpen();
+ });
+ const d2 = this.onceClose((code, reason) => {
+ d1.dispose();
+ d2.dispose();
+ rejectOpen(new Error(`WebSocket closed before send: code=${code}, reason=${reason || ''}`));
+ });
+ });
+ }
+
+ handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn();
+ const packet = handle.get();
+
+ for (let i = 0; i < packet.byteLength; i += chunkSize) {
+ const chunk = packet.subarray(i, Math.min(i + chunkSize, packet.byteLength));
+ this.socket.send(chunk);
+ // 背压:当缓冲区超出高水位时让出事件循环
+ while (this.socket.bufferedAmount > WS_BUFFERED_HWM) {
+ await new Promise((r) => setTimeout(r, 0));
+ }
+ }
+ resolve();
+ } catch (error) {
+ console.error('[ReconnectingWebSocket] Error sending data:', error);
+ reject(error instanceof Error ? error : new Error(String(error)));
+ } finally {
+ if (handle) {
+ handle.dispose();
+ }
+ }
+ this.sendQueue.shift();
+ }
+ } finally {
+ this.sending = false;
+ }
}
send(data: Uint8Array): Promise<void> { | ||
return new Promise((resolve, reject) => { | ||
this.sendQueue.push({ data, resolve, reject }); | ||
this.processSendQueue(); | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
send Promise 未在实例 dispose 时被拒绝,可能导致调用方悬挂
当前 dispose 清空了队列但未 reject 未完成的发送 Promise,会让上层 await 永远不返回。
建议在 dispose 时逐一 reject,并在发送流程中对 CLOSE 做兜底:
send(data: Uint8Array): Promise<void> {
return new Promise((resolve, reject) => {
this.sendQueue.push({ data, resolve, reject });
this.processSendQueue();
});
}
在 dispose 里处理(见下一个评论的 diff)。
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In packages/connection/src/common/connection/drivers/reconnecting-websocket.ts
around lines 67 to 72, the send() method enqueues a Promise but dispose()
currently only clears the queue without rejecting those pending Promises causing
callers to hang; update dispose() to iterate over this.sendQueue and call each
item's reject with a clear Error (e.g. "connection disposed") and then clear the
queue, and also update the send processing logic to treat the WebSocket CLOSE
state as a terminal error by rejecting pending send Promises (or immediately
rejecting new sends) when socket state is CLOSED so no send Promise remains
unresolved.
dispose(): void { | ||
// do nothing | ||
this.socket.removeEventListener('message', this.arrayBufferHandler); | ||
this.sendQueue = []; | ||
this.sending = false; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dispose 未释放 decoder 且未拒绝队列中的未完成发送,存在内存与逻辑泄漏
- 未调用
this.decoder.dispose()
,内部 Buffers 可能长期占用内存。 - 仅清空队列但不 reject,会造成上层 await 悬挂。
应用如下 diff:
dispose(): void {
- this.socket.removeEventListener('message', this.arrayBufferHandler);
- this.sendQueue = [];
- this.sending = false;
+ this.socket.removeEventListener('message', this.arrayBufferHandler);
+ // 拒绝所有未完成发送
+ const err = new Error('ReconnectingWebSocketConnection disposed');
+ for (const item of this.sendQueue) {
+ try {
+ item.reject(err);
+ } catch {}
+ }
+ this.sendQueue = [];
+ this.sending = false;
+ // 释放解码器资源
+ this.decoder.dispose();
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
dispose(): void { | |
// do nothing | |
this.socket.removeEventListener('message', this.arrayBufferHandler); | |
this.sendQueue = []; | |
this.sending = false; | |
} | |
dispose(): void { | |
this.socket.removeEventListener('message', this.arrayBufferHandler); | |
// 拒绝所有未完成发送 | |
const err = new Error('ReconnectingWebSocketConnection disposed'); | |
for (const item of this.sendQueue) { | |
try { | |
item.reject(err); | |
} catch {} | |
} | |
this.sendQueue = []; | |
this.sending = false; | |
// 释放解码器资源 | |
this.decoder.dispose(); | |
} |
🤖 Prompt for AI Agents
In packages/connection/src/common/connection/drivers/reconnecting-websocket.ts
around lines 132 to 136, dispose() currently removes the message listener and
clears sendQueue without disposing the decoder or rejecting any pending send
promises; update dispose() to call this.decoder.dispose() (if decoder exists)
and iterate the sendQueue to reject each pending send with an explicit Error
(e.g. 'disposed'), then clear the queue and set sending=false; also defensively
null out references (socket, decoder) after cleanup to avoid retained memory.
const handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn(); | ||
try { | ||
this.writable.write(handle.get(), (error) => { | ||
if (error) { | ||
console.error('Failed to write data:', error); | ||
} | ||
}); | ||
} finally { | ||
handle.dispose(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
write 回调签名错误 + 过早释放缓冲区句柄 + 忽略 backpressure,存在数据丢失/内存膨胀风险
- Node.js Writable.write(callback) 的回调不携带 error 参数;当前对 error 的判断不会生效。
- 句柄在 write 调用后立刻在 finally 中 dispose,可能早于底层 flush 完成,若句柄复用/内存池回收可能引发写入数据被污染。
- 未处理 write 返回 false 的背压信号,可能导致持续堆积内存。
建议:将 dispose 放到 write 的回调或 drain 之后;监听一次性 error;处理背压。示例修复如下:
send(data: Uint8Array): void {
- const handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn();
- try {
- this.writable.write(handle.get(), (error) => {
- if (error) {
- console.error('Failed to write data:', error);
- }
- });
- } finally {
- handle.dispose();
- }
+ const handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn();
+ const buf = handle.get();
+ const cleanup = () => {
+ try {
+ handle.dispose();
+ } catch (e) {
+ console.warn('[StreamConnection] dispose failed:', e);
+ }
+ };
+ const onError = (err: Error) => {
+ console.error('[StreamConnection] Failed to write data:', err);
+ this.writable.off('drain', onDrain);
+ this.writable.off('error', onError);
+ cleanup();
+ };
+ const onDrain = () => {
+ this.writable.off('drain', onDrain);
+ this.writable.off('error', onError);
+ // drain 触发时可安全清理
+ cleanup();
+ };
+ try {
+ const ok = this.writable.write(buf, () => {
+ // 正常 flush 完成
+ this.writable.off('error', onError);
+ cleanup();
+ });
+ this.writable.once('error', onError);
+ if (!ok) {
+ // backpressure: 等 drain 再清理
+ this.writable.once('drain', onDrain);
+ }
+ } catch (e) {
+ // 处理同步异常
+ onError(e as Error);
+ }
}
🤖 Prompt for AI Agents
In packages/connection/src/common/connection/drivers/stream.ts around lines 25
to 34, the current write logic treats the Writable.write callback as (error)
which is incorrect, disposes the buffer handle immediately (risking
reuse/pollution), and ignores backpressure; fix by: 1) call writable.write(data)
and capture its boolean return; 2) if write returns false, stop sending further
data and attach a one-time 'drain' listener to resume and only dispose the
handle after drain; 3) attach a one-time 'error' listener on the stream to
log/handle write errors; and 4) move handle.dispose() into the write completion
path (either after write returns true immediately or inside the 'drain' or write
completion callback) so the buffer is not released before the underlying I/O
finishes.
private async processSendQueue() { | ||
if (this.sending) { | ||
return; | ||
} | ||
this.sending = true; | ||
|
||
while (this.sendQueue.length > 0) { | ||
const { data, resolve, reject } = this.sendQueue[0]; | ||
let handle: { get: () => Uint8Array; dispose: () => void } | null = null; | ||
|
||
try { | ||
handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn(); | ||
const packet = handle.get(); | ||
|
||
for (let i = 0; i < packet.byteLength; i += chunkSize) { | ||
if (!this.isOpen()) { | ||
throw new Error('Connection closed while sending'); | ||
} | ||
|
||
await new Promise<void>((resolve, reject) => { | ||
const chunk = packet.subarray(i, Math.min(i + chunkSize, packet.byteLength)); | ||
this.socket.send(chunk, { binary: true }, (error?: Error) => { | ||
if (error) { | ||
reject(error); | ||
} else { | ||
resolve(); | ||
} | ||
}); | ||
}); | ||
} | ||
|
||
resolve(); | ||
} catch (error) { | ||
reject(error instanceof Error ? error : new Error(String(error))); | ||
} finally { | ||
if (handle) { | ||
try { | ||
handle.dispose(); | ||
} catch (error) { | ||
console.warn('[WSWebSocket] Error disposing handle:', error); | ||
} | ||
} | ||
this.pendingSize -= this.sendQueue[0].data.byteLength; | ||
this.sendQueue.shift(); | ||
} | ||
} | ||
|
||
this.sending = false; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
并发窗口下可能“卡队列”:发送循环结束与新入队之间存在竞态
当最后一个元素处理完后 while 判断 sendQueue.length === 0 跳出,此时尚未将 sending 置回 false;若在这两个操作之间有新任务入队,因 send() 在 sending=true 时不会再次调度,可能导致队列停滞直至下一次显式 send 触发。
建议在退出循环后重检队列并必要时自我重启处理:
- this.sending = false;
+ const shouldContinue = this.sendQueue.length > 0;
+ this.sending = false;
+ if (shouldContinue) {
+ // 处理并发窗口中推入的新任务
+ this.processSendQueue().catch((error) => {
+ console.error('[WSWebSocket] Error processing queue:', error);
+ });
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
private async processSendQueue() { | |
if (this.sending) { | |
return; | |
} | |
this.sending = true; | |
while (this.sendQueue.length > 0) { | |
const { data, resolve, reject } = this.sendQueue[0]; | |
let handle: { get: () => Uint8Array; dispose: () => void } | null = null; | |
try { | |
handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn(); | |
const packet = handle.get(); | |
for (let i = 0; i < packet.byteLength; i += chunkSize) { | |
if (!this.isOpen()) { | |
throw new Error('Connection closed while sending'); | |
} | |
await new Promise<void>((resolve, reject) => { | |
const chunk = packet.subarray(i, Math.min(i + chunkSize, packet.byteLength)); | |
this.socket.send(chunk, { binary: true }, (error?: Error) => { | |
if (error) { | |
reject(error); | |
} else { | |
resolve(); | |
} | |
}); | |
}); | |
} | |
resolve(); | |
} catch (error) { | |
reject(error instanceof Error ? error : new Error(String(error))); | |
} finally { | |
if (handle) { | |
try { | |
handle.dispose(); | |
} catch (error) { | |
console.warn('[WSWebSocket] Error disposing handle:', error); | |
} | |
} | |
this.pendingSize -= this.sendQueue[0].data.byteLength; | |
this.sendQueue.shift(); | |
} | |
} | |
this.sending = false; | |
} | |
private async processSendQueue() { | |
if (this.sending) { | |
return; | |
} | |
this.sending = true; | |
while (this.sendQueue.length > 0) { | |
const { data, resolve, reject } = this.sendQueue[0]; | |
let handle: { get: () => Uint8Array; dispose: () => void } | null = null; | |
try { | |
handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn(); | |
const packet = handle.get(); | |
for (let i = 0; i < packet.byteLength; i += chunkSize) { | |
if (!this.isOpen()) { | |
throw new Error('Connection closed while sending'); | |
} | |
await new Promise<void>((resolve, reject) => { | |
const chunk = packet.subarray(i, Math.min(i + chunkSize, packet.byteLength)); | |
this.socket.send(chunk, { binary: true }, (error?: Error) => { | |
if (error) { | |
reject(error); | |
} else { | |
resolve(); | |
} | |
}); | |
}); | |
} | |
resolve(); | |
} catch (error) { | |
reject(error instanceof Error ? error : new Error(String(error))); | |
} finally { | |
if (handle) { | |
try { | |
handle.dispose(); | |
} catch (error) { | |
console.warn('[WSWebSocket] Error disposing handle:', error); | |
} | |
} | |
this.pendingSize -= this.sendQueue[0].data.byteLength; | |
this.sendQueue.shift(); | |
} | |
} | |
const shouldContinue = this.sendQueue.length > 0; | |
this.sending = false; | |
if (shouldContinue) { | |
// 处理并发窗口中推入的新任务 | |
this.processSendQueue().catch((error) => { | |
console.error('[WSWebSocket] Error processing queue:', error); | |
}); | |
} | |
} |
🤖 Prompt for AI Agents
In packages/connection/src/common/connection/drivers/ws-websocket.ts around
lines 32 to 80, the send loop can race with new enqueues: after the while loop
exits we set sending = false but a new item enqueued between loop exit and that
assignment will never trigger processing. Fix by ensuring we re-check the queue
before finalizing sending — either convert the logic to a do/while that
continues while sendQueue.length>0 or, after setting sending = false,
immediately check if sendQueue.length>0 and if so re-enter processSendQueue
(e.g., call it asynchronously). Also ensure any exceptions keep the re-check
behavior so the queue is not left stalled.
dispose(): void { | ||
this.socket.removeAllListeners(); | ||
// 拒绝所有待发送的消息 | ||
while (this.sendQueue.length > 0) { | ||
const { reject } = this.sendQueue.shift()!; | ||
reject(new Error('Connection disposed')); | ||
} | ||
this.pendingSize = 0; | ||
this.sending = false; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
dispose 移除所有监听器存在破坏性;且缺少 decoder.dispose 导致泄漏
- 使用 this.socket.removeAllListeners() 会移除非本类添加的监听器,破坏外部逻辑。
- 未释放 decoder 的 onData 监听与内部缓冲。
建议:仅卸载本类绑定的具体回调,并补充 decoder.dispose()。可通过保存构造期的 message 回调引用实现精准解绑:
- constructor(public socket: WebSocket) {
+ private readonly onSocketMessage = (data: Buffer) => {
+ this.decoder.push(data as unknown as Uint8Array);
+ };
+ constructor(public socket: WebSocket) {
super();
- this.socket.on('message', (data: Buffer) => {
- this.decoder.push(data);
- });
+ this.socket.on('message', this.onSocketMessage);
}
@@
dispose(): void {
- this.socket.removeAllListeners();
+ this.socket.off('message', this.onSocketMessage);
+ // 不干扰外部对 'close' 等事件的监听
+ this.decoder.dispose();
// 拒绝所有待发送的消息
while (this.sendQueue.length > 0) {
const { reject } = this.sendQueue.shift()!;
reject(new Error('Connection disposed'));
}
this.pendingSize = 0;
this.sending = false;
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
dispose(): void { | |
this.socket.removeAllListeners(); | |
// 拒绝所有待发送的消息 | |
while (this.sendQueue.length > 0) { | |
const { reject } = this.sendQueue.shift()!; | |
reject(new Error('Connection disposed')); | |
} | |
this.pendingSize = 0; | |
this.sending = false; | |
} | |
// Preserve a reference to the message handler so we can remove exactly this listener later | |
private readonly onSocketMessage = (data: Buffer) => { | |
this.decoder.push(data as unknown as Uint8Array); | |
}; | |
constructor(public socket: WebSocket) { | |
super(); | |
// Attach using the named handler | |
this.socket.on('message', this.onSocketMessage); | |
} | |
dispose(): void { | |
// Only remove our message listener—don’t disturb other consumers of the socket | |
this.socket.off('message', this.onSocketMessage); | |
// Clean up the decoder’s internal buffers and listeners | |
this.decoder.dispose(); | |
// 拒绝所有待发送的消息 | |
while (this.sendQueue.length > 0) { | |
const { reject } = this.sendQueue.shift()!; | |
reject(new Error('Connection disposed')); | |
} | |
this.pendingSize = 0; | |
this.sending = false; | |
} |
🤖 Prompt for AI Agents
In packages/connection/src/common/connection/drivers/ws-websocket.ts around
lines 114 to 123, avoid calling this.socket.removeAllListeners() because it
removes listeners added by external code; instead, store the specific bound
callbacks used by this class (e.g., this.onOpen, this.onClose, this.onError,
this.onMessage) during construction and call this.socket.off('event', callback)
for each to unbind precisely. Also ensure the decoder is properly disposed: call
decoder.off/onData removal (or decoder.dispose()) to remove its onData listener
and release internal buffers. Keep the existing logic to reject queued sends and
reset pending state, but add explicit unbinding of socket listeners and
decoder.dispose() before clearing queues and toggling flags.
…in WebSocket connections
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (4)
packages/connection/src/common/connection/drivers/ws-websocket.ts (2)
33-38
: 修复并发竞态:发送循环结束与新入队之间可能“卡队列”,需在退出时重检并自我重启当前在 while 退出与将 sending 置回 false 之间,若有新任务入队,send() 因检测到 sending=true 不会再次调度,导致队列停滞。建议用 try/finally 保证退出前做一次重检,并在必要时自我重启处理。
private async processSendQueue() { if (this.sending) { return; } - this.sending = true; - - while (this.sendQueue.length > 0) { + this.sending = true; + try { + while (this.sendQueue.length > 0) { const { data, resolve, reject } = this.sendQueue[0]; let handle: { get: () => Uint8Array; dispose: () => void } | null = null; @@ - } - - this.sending = false; + } + } finally { + const shouldContinue = this.sendQueue.length > 0; + this.sending = false; + if (shouldContinue) { + this.processSendQueue().catch((error) => { + console.error('[WSWebSocket] Error processing queue:', error); + }); + } + } }Also applies to: 80-81
28-31
: dispose 不应 removeAllListeners,且缺少 decoder.dispose,存在破坏性与泄漏风险
- removeAllListeners 会移除外部注册的监听器,破坏集成方逻辑。
- 未释放 decoder 的 onData 监听与内部缓冲,可能导致内存泄漏。
在构造时绑定可解绑的同名回调,并在 dispose 中精准解绑与释放 decoder:
- this.socket.on('message', (data: Buffer) => { - this.decoder.push(data); - }); + this.socket.on('message', this.onSocketMessage); @@ - dispose(): void { - this.socket.removeAllListeners(); + dispose(): void { + this.socket.off('message', this.onSocketMessage); + this.decoder.dispose(); // 拒绝所有待发送的消息 while (this.sendQueue.length > 0) { const { reject } = this.sendQueue.shift()!; reject(new Error('Connection disposed')); } this.pendingSize = 0; this.sending = false; }并在类内新增回调字段(需放在类字段区,示例紧随 sending 字段之后):
private readonly onSocketMessage = (data: Buffer) => { // Buffer 兼容 Uint8Array this.decoder.push(data as unknown as Uint8Array); };Also applies to: 121-130
packages/connection/src/common/connection/drivers/reconnecting-websocket.ts (2)
34-70
: 重构发送循环:移除“无效 await”、增加 OPEN 等待与背压,并修复与入队的竞态重启
- for 内部的 await new Promise(resolve => { send(); resolve(); }) 为无效等待,只引入微任务抖动。
- 未等待 OPEN,处于 CONNECTING/CLOSING 时可能抛错或丢包(取决于实现)。
- 缺少基于 bufferedAmount 的最小背压,连续大包可能造成内存峰值与延迟。
- 与 ws-websocket.ts 一样存在循环退出与新入队之间的竞态导致停滞。
private async processSendQueue() { if (this.sending) { return; } - this.sending = true; - - while (this.sendQueue.length > 0) { - const { data, resolve, reject } = this.sendQueue[0]; - let handle: { get: () => Uint8Array; dispose: () => void } | null = null; - - try { - handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn(); - const packet = handle.get(); - - for (let i = 0; i < packet.byteLength; i += chunkSize) { - await new Promise<void>((resolve) => { - const chunk = packet.subarray(i, Math.min(i + chunkSize, packet.byteLength)); - this.socket.send(chunk); - resolve(); - }); - } - - resolve(); - } catch (error) { - console.error('[ReconnectingWebSocket] Error sending data:', error); - reject(error); - } finally { - if (handle) { - handle.dispose(); - } - this.pendingSize -= this.sendQueue[0].data.byteLength; - } - this.sendQueue.shift(); - } - - this.sending = false; + this.sending = true; + try { + while (this.sendQueue.length > 0) { + const { data, resolve, reject } = this.sendQueue[0]; + let handle: { get: () => Uint8Array; dispose: () => void } | null = null; + try { + // 等待 OPEN,如期间关闭则中断 + if (!this.isOpen()) { + await new Promise<void>((resolveOpen, rejectOpen) => { + const d1 = this.onOpen(() => { + d1.dispose(); + d2.dispose(); + resolveOpen(); + }); + const d2 = this.onceClose((code, reason) => { + d1.dispose(); + d2.dispose(); + rejectOpen(new Error(`WebSocket closed before send: code=${code}, reason=${reason ?? ''}`)); + }); + }); + } + + handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn(); + const packet = handle.get(); + + for (let i = 0; i < packet.byteLength; i += chunkSize) { + const chunk = packet.subarray(i, Math.min(i + chunkSize, packet.byteLength)); + this.socket.send(chunk); + // 简单背压:缓冲区高于水位时让出事件循环 + while ((this.socket as unknown as WebSocket).bufferedAmount > WS_BUFFERED_HWM) { + await new Promise((r) => setTimeout(r, 0)); + } + } + resolve(); + } catch (error) { + console.error('[ReconnectingWebSocket] Error sending data:', error); + reject(error instanceof Error ? error : new Error(String(error))); + } finally { + if (handle) { + handle.dispose(); + } + this.pendingSize -= this.sendQueue[0].data.byteLength; + this.sendQueue.shift(); + } + } + } finally { + const shouldContinue = this.sendQueue.length > 0; + this.sending = false; + if (shouldContinue) { + this.processSendQueue().catch((error) => { + console.error('[ReconnectingWebSocket] Error processing queue:', error); + }); + } + } }为配合背压,请在文件顶部附近添加高水位常量(或统一抽到 constants.ts):
// 简单背压高水位:8MB(可按场景调优) const WS_BUFFERED_HWM = 8 * 1024 * 1024;
150-154
: dispose 未 reject 未完成发送、未重置 pendingSize、未释放 decoder,导致上层 await 悬挂与内存泄漏需要逐一 reject 队列中 Promise,重置计数,并调用 decoder.dispose()。
dispose(): void { this.socket.removeEventListener('message', this.arrayBufferHandler); - this.sendQueue = []; - this.sending = false; + // 拒绝所有未完成发送 + const err = new Error('ReconnectingWebSocketConnection disposed'); + for (const item of this.sendQueue) { + try { item.reject(err); } catch {} + } + this.sendQueue = []; + this.pendingSize = 0; + this.sending = false; + // 释放解码器资源 + this.decoder.dispose(); }
🧹 Nitpick comments (6)
packages/connection/src/common/connection/drivers/ws-websocket.ts (3)
83-102
: send 在 CLOSED 状态应尽早失败,避免无意义排队占用内存当前即便连接已关闭,仍会入队等待后续处理。建议在入队前做一次就绪性检查并直接 reject。
send(data: Uint8Array): Promise<void> { return new Promise<void>((resolve, reject) => { @@ - this.pendingSize += data.byteLength; + // 连接已关闭则直接拒绝(ws 不会自动重连) + if (!this.isOpen()) { + reject(new Error('WebSocket is not open')); + return; + } + this.pendingSize += data.byteLength; this.sendQueue.push({ data, resolve, reject }); this.processSendQueue().catch((error) => { console.error('[WSWebSocket] Error processing queue:', error); }); }); }
47-62
: 可选:在构造帧之前先做一次 isOpen() 快速检查以避免无谓开销虽然分片循环内已有 isOpen 校验,但在巨大 payload 上,提前失败能省去一次构帧及句柄创建释放的成本。
- try { - handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn(); + try { + if (!this.isOpen()) { + throw new Error('Connection closed before framing'); + } + handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn(); const packet = handle.get();
1-1
: 建议补充针对大包/中断的测试用例
- 发送大于 chunkSize 的数据应完整到达。
- 发送过程中关闭连接,应确保当前 Promise reject,后续排队项不悬挂。
- dispose 后入队的 send 应被拒绝。
如需,我可以起草基于现有测试框架的单测样例。
packages/connection/src/common/connection/drivers/reconnecting-websocket.ts (3)
72-89
: send 未对处理循环的异常做兜底日志;建议与 ws 驱动对齐并提升可观测性当前直接调用 processSendQueue(),建议补充 catch,以避免静默失败;此外 pendingSize/队列边界检查已妥当,LGTM。
- this.pendingSize += data.byteLength; - this.sendQueue.push({ data, resolve, reject }); - this.processSendQueue(); + this.pendingSize += data.byteLength; + this.sendQueue.push({ data, resolve, reject }); + this.processSendQueue().catch((error) => { + console.error('[ReconnectingWebSocket] Error processing queue:', error); + });
16-23
: 建议抽取通用“排队发送器”以避免两套驱动实现漂移两处驱动的发送/背压/重启/清理逻辑高度相似且易产生细微差异,建议抽取到可复用的 helper(如 QueuedSender/FrameSender),由具体驱动注入 send/readyState/backpressure 适配层,降低维护成本与回归风险。
24-32
: 构造器将 blob 直接抛错可接受;如需兼容可考虑降级到 arrayBuffer非必须,按产品定位决定。当前实现清晰。
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
packages/connection/src/common/connection/drivers/reconnecting-websocket.ts
(3 hunks)packages/connection/src/common/connection/drivers/ws-websocket.ts
(2 hunks)packages/connection/src/common/constants.ts
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/connection/src/common/constants.ts
🧰 Additional context used
🧬 Code graph analysis (2)
packages/connection/src/common/connection/drivers/reconnecting-websocket.ts (2)
packages/connection/src/common/connection/drivers/frame-decoder.ts (1)
LengthFieldBasedFrameDecoder
(26-230)packages/connection/src/common/constants.ts (1)
chunkSize
(6-6)
packages/connection/src/common/connection/drivers/ws-websocket.ts (2)
packages/connection/src/common/connection/drivers/frame-decoder.ts (1)
LengthFieldBasedFrameDecoder
(26-230)packages/connection/src/common/constants.ts (1)
chunkSize
(6-6)
🔇 Additional comments (4)
packages/connection/src/common/connection/drivers/ws-websocket.ts (2)
11-21
: 队列与限流常量设置清晰可控,LGTM
105-107
: onMessage 与 decoder.onData 对接正确,LGTMpackages/connection/src/common/connection/drivers/reconnecting-websocket.ts (2)
145-148
: arrayBuffer 入站路径正确对接 decoder,LGTM
104-106
: onMessage 使用 decoder.onData 回调机制一致,LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (7)
packages/editor/src/browser/doc-model/large-file-optimizer.ts (7)
1-1
: 仅用作类型时,使用import type
以优化打包与运行时体积当前仅在类型位置引用
monaco.editor.IEditorOptions
,可改为import type
,避免将运行时代码打进 bundle。-import * as monaco from '@opensumi/ide-monaco'; +import type * as monaco from '@opensumi/ide-monaco';
22-44
: 仅在未显式配置时再降级automaticLayout
,避免破坏宿主布局预期将
automaticLayout
一刀切设为false
可能造成编辑器容器尺寸变化时不自动重排,破坏 UX。建议仅当上游未配置时再禁用。- optimizedOptions.automaticLayout = false; + if (optimizedOptions.automaticLayout === undefined) { + optimizedOptions.automaticLayout = false; + }
50-56
: 在合并嵌套对象的同时关闭功能,保留既有细粒度设置直接覆盖
minimap
/hover
子对象会丢失调用方在baseOptions
中的其它细粒度设置(如side
,renderCharacters
、delay
等)。建议基于现有对象做浅合并,仅强制关闭enabled
。- if (optimizations.disableMinimap) { - optimizedOptions.minimap = { enabled: false }; - } + if (optimizations.disableMinimap) { + optimizedOptions.minimap = { + ...(optimizedOptions.minimap ?? {}), + enabled: false, + }; + } - if (optimizations.disableHover) { - optimizedOptions.hover = { enabled: false }; - } + if (optimizations.disableHover) { + optimizedOptions.hover = { + ...(optimizedOptions.hover ?? {}), + enabled: false, + }; + }
58-66
: 接口中声明了disableSemanticHighlighting
但未实际生效
ILargeFileOptimizationOptions
暴露了disableSemanticHighlighting
,但getLargeFileOptimizedEditorOptions
未应用该开关,易造成 API 语义与实际行为不一致。如需关闭语义高亮(依赖集成版本,通常为
semanticHighlighting.enabled
),可按下述方式处理;若当前集成不支持,请在接口中移除该字段或标注不生效说明。if (optimizations.disableQuickSuggestions) { optimizedOptions.quickSuggestions = false; optimizedOptions.suggestOnTriggerCharacters = false; } + + if (optimizations.disableSemanticHighlighting) { + // 注意:不同版本/包装层的键可能为 semanticHighlighting.enabled + // 这里用 any 以兼容 OpenSumi 的 Monaco 类型声明差异 + (optimizedOptions as any).semanticHighlighting = { enabled: false }; + }
73-86
: 更稳健的行数计算与阈值比较;建议抽出共享阈值常量
- 使用
content.split('\n')
会构造大数组,内存放大;改为流式计数。- 阈值建议采用
>=
,以免正好等于阈值时未优化。- 将阈值抽为导出的常量,便于与调用侧(如 base-editor-wrapper)统一来源。
-export function shouldOptimizeForLargeFile(fileSizeBytes: number, content?: string): boolean { - const SIZE_THRESHOLD = 10 * 1024 * 1024; // 10MB - const LINE_THRESHOLD = 50000; // 50k 行 +export function shouldOptimizeForLargeFile(fileSizeBytes: number, content?: string): boolean { + const SIZE_THRESHOLD = DEFAULT_SIZE_THRESHOLD; // 10MB + const LINE_THRESHOLD = DEFAULT_LINE_THRESHOLD; // 50k 行 - - if (fileSizeBytes > SIZE_THRESHOLD) { + if (fileSizeBytes >= SIZE_THRESHOLD) { return true; } - if (content && content.split('\n').length > LINE_THRESHOLD) { + if (content && countLines(content) > LINE_THRESHOLD) { return true; } return false; }在文件其它位置新增的辅助/常量(支持性代码,需一并添加):
// 建议放在文件顶部接口后 export const DEFAULT_SIZE_THRESHOLD = 10 * 1024 * 1024; // 10MB export const DEFAULT_LINE_THRESHOLD = 50_000; // 50k 行 function countLines(text: string): number { // 对于空串返回 0;避免 split 产生大数组 if (text.length === 0) return 0; let lines = 1; for (let i = 0; i < text.length; i++) { if (text.charCodeAt(i) === 10 /* '\n' */) lines++; } return lines; }此外,结合集成方逻辑(如大于 20MB 关闭 hover、大于 30MB 关闭 QS),建议提供一个“基于大小/行数计算优化开关”的工厂函数,集中产出
ILargeFileOptimizationOptions
,以消除分散阈值带来的不一致。如需,我可以补一个示例实现。
91-100
: 收紧metadata
的类型并对布尔值做规范化
any
会放大不确定性,且直接透传可能接收字符串'true'
等“真值”。建议改为unknown
/Record<string, unknown>
并统一用Boolean()
归一化为布尔。-export function extractOptimizationOptions(metadata: any = {}): ILargeFileOptimizationOptions { - return { - optimizedMode: metadata.optimizedMode, - disableSemanticHighlighting: metadata.disableSemanticHighlighting, - disableWordWrap: metadata.disableWordWrap, - disableMinimap: metadata.disableMinimap, - disableHover: metadata.disableHover, - disableCodeLens: metadata.disableCodeLens, - disableQuickSuggestions: metadata.disableQuickSuggestions, - }; -} +export function extractOptimizationOptions(metadata: unknown = {}): ILargeFileOptimizationOptions { + const m = (metadata as Record<string, unknown>) || {}; + return { + optimizedMode: Boolean(m.optimizedMode), + disableSemanticHighlighting: Boolean(m.disableSemanticHighlighting), + disableWordWrap: Boolean(m.disableWordWrap), + disableMinimap: Boolean(m.disableMinimap), + disableHover: Boolean(m.disableHover), + disableCodeLens: Boolean(m.disableCodeLens), + disableQuickSuggestions: Boolean(m.disableQuickSuggestions), + }; +}
1-102
: 补充单元测试覆盖关键阈值与主要开关建议新增用例:
shouldOptimizeForLargeFile
:覆盖边界(10MB、正好 50k 行)与大内容场景。getLargeFileOptimizedEditorOptions
:当各disable*
开关开启时,确认对应字段被正确关闭;验证不会覆盖baseOptions
中的其它 minimap/hover 子字段。如果需要,我可以基于 Jest/ts-jest 提交一组最小测试样例。
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
packages/editor/src/browser/doc-model/large-file-optimizer.ts
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
packages/editor/src/browser/doc-model/large-file-optimizer.ts (1)
packages/monaco/src/browser/monaco-api/editor.ts (1)
IEditorOptions
(132-132)
🔇 Additional comments (2)
packages/editor/src/browser/doc-model/large-file-optimizer.ts (2)
13-21
: 整体设计方向合理,API 清晰易用
- 将“是否大文件”的判定与“如何按开关生成 options”解耦,易于在不同入口复用。
baseOptions
浅拷贝避免了直接篡改调用方入参,👍
29-33
: 需要手动验证showFoldingControls
与occurrencesHighlight
的类型定义仓库中未能检索到这两项属性的声明,可能位于外部依赖(如 monaco-editor 的类型文件)中,请开发者手动确认以下几点:
IEditorOptions.showFoldingControls
在官方类型中允许的值(通常为'always'
或'mouseover'
),不支持'never'
。IEditorOptions.occurrencesHighlight
在官方类型中为boolean
,不接受字符串'off'
。建议根据确认结果做如下调整:
- optimizedOptions.showFoldingControls = 'never'; + // 已禁用折叠功能,可移除该配置或设置为 'mouseover' - optimizedOptions.occurrencesHighlight = 'off'; + optimizedOptions.occurrencesHighlight = false;
Types
Background or solution
Changelog
Summary by CodeRabbit
新功能
性能
修复