diff --git a/TERMIX_MIGRATION.md b/TERMIX_MIGRATION.md index 5a5deb2..c25f411 100644 --- a/TERMIX_MIGRATION.md +++ b/TERMIX_MIGRATION.md @@ -155,9 +155,26 @@ One real bug was caught and fixed: the first version ran all 10 collectors via ` - The frontend page was typechecked and manually reviewed, and the route was confirmed to be served by Vite, but **not visually verified in a browser** — Playwright wasn't available in this sandbox for this phase (no cached install found). This is a real verification gap, not a claim of UI testing that didn't happen. - All test artifacts (test `sshd` process, test OS user, test backend instance, test DB, tokens, temp env/log files) were cleaned up afterward. +### Phase 7 — Host-to-Host File Transfer (DONE) + +**Architecture decision**: Termix's `host-transfer.ts` (3,428 lines, plus `transfer-paths.ts`/`transfer-routing.ts`) is a heavily over-engineered system — parallel-segment workers, a tar-vs-per-file-SFTP method selector driven by incompressibility heuristics, hung-stream watchdogs, retry orchestration, worker caches, archive-method previews. Per the same stance taken in every prior phase, only the **core value** was ported: streaming a file/directory from one SSH host to another through the backend (read from the source's SFTP, write to the destination's SFTP, item by item). This is exactly the `item_sftp` path Termix itself falls back to in most cases; the parallel/tar/watchdog machinery is left behind as unjustified at this app's scale. Reuses ArchNest's existing `connectTarget` SSH helper (jump-host support inherited for free on both ends), not Termix's connection pool/session manager. Delivery mirrors Phase 2/6: an in-memory transfer registry + REST polling, no websockets. + +**What was built:** +- `backend/src/ssh/transfer.ts` — the transfer engine. `startTransfer()` returns a `transferId` and runs asynchronously: opens an SFTP connection to both hosts, scans the source tree up front (depth-first walk) to compute `totalFiles`/`totalBytes` for a real progress bar, recreates the directory structure on the destination, then streams each file (source `createReadStream` → dest `createWriteStream`). Tracks live progress in an in-memory `activeTransfers` map; supports `move` (deletes the source tree, files-then-dirs-deepest-first, after a successful copy) and cooperative cancellation (a flag checked between files and on every read chunk). `cleanupOldTransfers()` drops finished entries after an hour. +- `backend/src/routes/transfer.ts` — `POST /api/transfers` (start), `GET /api/transfers` (list), `GET /api/transfers/:id` (status), `POST /api/transfers/:id/cancel`. All authenticated; start is zod-validated. +- `src/pages/Files.tsx` — added a per-entry "Send to another host" action (disabled unless ≥2 SSH hosts exist) opening a modal (destination host dropdown, destination directory, move checkbox), plus a live "Host-to-Host Transfers" panel that polls (1s while any transfer is running, 5s otherwise) and shows per-transfer progress bars, current file, status, and a cancel button. +- `src/lib/api.ts` — `startTransfer`/`listTransfers`/`getTransfer`/`cancelTransfer` + `TransferProgress` type. + +**Verified end-to-end** against two real SSH endpoints (a real `sshd` with two real OS users as source/dest, not mocked): created two real `ssh`-type integrations and exercised all four behaviours over the real API: +- **Recursive directory copy** of a tree (text file + a 100 KB random binary + a nested subdir): completed 3/3 files / 100,019 bytes; verified on disk that the directory structure was recreated, text content was intact, and the binary's `md5sum` matched the source exactly. +- **Move**: a single file transferred with `move:true` — confirmed present on the destination and **deleted from the source** afterward. +- **Error handling**: a transfer of a nonexistent source path ended `status: "failed"` with a clear `"No such file"` error rather than hanging. +- **Cancellation**: an 80 MB transfer cancelled ~0.3 s in stopped at 162 KB with `status: "cancelled"` — confirming the mid-stream cancel flag actually interrupts the copy. + +All test artifacts (test `sshd`, both test OS users + their home dirs, test backend instance, test DB, temp files) were cleaned up afterward. + ### Also worth checking during/after the phases above -- `src/backend/ssh/host-transfer.ts` (3,428 lines) — appears to be server-to-server file/data transfer; likely folds into Phase 3 (file manager) rather than being separate. - Data export/import of SSH hosts/credentials/file-manager data — a nice-to-have, not yet scheduled. ## Tracking diff --git a/backend/src/routes/transfer.ts b/backend/src/routes/transfer.ts new file mode 100644 index 0000000..14dae2d --- /dev/null +++ b/backend/src/routes/transfer.ts @@ -0,0 +1,48 @@ +import type { FastifyInstance } from 'fastify' +import { z } from 'zod' +import { logEvent } from '../db/index.js' +import { startTransfer, getTransfer, listTransfers, cancelTransfer } from '../ssh/transfer.js' + +const startSchema = z.object({ + sourceIntegrationId: z.number().int().positive(), + destIntegrationId: z.number().int().positive(), + sourcePaths: z.array(z.string().min(1)).min(1), + destPath: z.string().min(1), + move: z.boolean().optional(), +}) + +export async function transferRoutes(app: FastifyInstance) { + app.addHook('onRequest', app.authenticate) + + app.post('/api/transfers', async (req, reply) => { + const parsed = startSchema.safeParse(req.body) + if (!parsed.success) { + return reply.code(400).send({ error: parsed.error.issues[0]?.message ?? 'Invalid input' }) + } + const transferId = startTransfer(parsed.data) + logEvent( + 'host_transfer_started', + `${parsed.data.move ? 'Move' : 'Copy'} of ${parsed.data.sourcePaths.length} item(s) between hosts`, + 'ssh', + ) + return reply.code(201).send({ transferId }) + }) + + app.get('/api/transfers', async () => { + return { transfers: listTransfers() } + }) + + app.get('/api/transfers/:id', async (req, reply) => { + const id = (req.params as { id: string }).id + const transfer = getTransfer(id) + if (!transfer) return reply.code(404).send({ error: 'Transfer not found' }) + return transfer + }) + + app.post('/api/transfers/:id/cancel', async (req, reply) => { + const id = (req.params as { id: string }).id + const ok = cancelTransfer(id) + if (!ok) return reply.code(409).send({ error: 'Transfer is not running' }) + return { ok: true } + }) +} diff --git a/backend/src/server.ts b/backend/src/server.ts index f33e5a7..6f5d9f0 100644 --- a/backend/src/server.ts +++ b/backend/src/server.ts @@ -14,6 +14,7 @@ import { fileRoutes } from './routes/files.js' import { dockerRoutes, dockerExecRoutes } from './routes/docker.js' import { guacamoleRoutes } from './routes/guacamole.js' import { metricsRoutes } from './routes/metrics.js' +import { transferRoutes } from './routes/transfer.js' import { startAutoStartTunnels } from './tunnels/manager.js' const JWT_SECRET = process.env.ARCHNEST_JWT_SECRET @@ -47,6 +48,7 @@ await app.register(dockerRoutes) await app.register(dockerExecRoutes) await app.register(guacamoleRoutes) await app.register(metricsRoutes) +await app.register(transferRoutes) app.get('/api/health', async () => ({ ok: true })) diff --git a/backend/src/ssh/transfer.ts b/backend/src/ssh/transfer.ts new file mode 100644 index 0000000..7d47b79 --- /dev/null +++ b/backend/src/ssh/transfer.ts @@ -0,0 +1,275 @@ +import { randomUUID } from 'node:crypto' +import type { Client, SFTPWrapper, FileEntry } from 'ssh2' +import { loadSshHost, connectTarget } from './connect.js' + +/** + * Host-to-host file transfer, streamed through the backend: read from the source + * host's SFTP and write to the destination host's SFTP, item by item. + * + * This deliberately ports only the core of Termix's host-transfer feature (its + * "item_sftp" path). The fork's parallel-segment workers, tar-vs-sftp heuristics, + * hung-stream watchdogs and retry orchestration (~3,400 lines) are left behind: + * at this app's scale a single streamed copy per file is simple, correct, and + * cancellable, which is what the feature actually needs. + */ + +export type TransferStatus = 'running' | 'completed' | 'failed' | 'cancelled' + +export interface TransferProgress { + transferId: string + status: TransferStatus + sourceIntegrationId: number + destIntegrationId: number + sourcePaths: string[] + destPath: string + move: boolean + totalFiles: number + totalBytes: number + filesTransferred: number + bytesTransferred: number + currentFile: string | null + error: string | null + startedAt: number + finishedAt: number | null +} + +interface SftpConnection { + conn: Client + jumpConn: Client | null + sftp: SFTPWrapper +} + +const activeTransfers = new Map() +const cancelRequested = new Set() + +function openSftp(integrationId: number): Promise { + const target = loadSshHost(integrationId) + if (!target) return Promise.reject(new Error(`SSH integration ${integrationId} not found`)) + return new Promise((resolve, reject) => { + let jumpConn: Client | null = null + const result = connectTarget( + target, + (client) => { + client.sftp((err, sftp) => { + if (err) { + client.end() + jumpConn?.end() + reject(err) + return + } + resolve({ conn: client, jumpConn, sftp }) + }) + }, + (message) => { + jumpConn?.end() + reject(new Error(message)) + }, + ) + jumpConn = result.jumpConn + }) +} + +function closeSftp(c: SftpConnection | null) { + if (!c) return + c.conn.end() + c.jumpConn?.end() +} + +function sftpStat(sftp: SFTPWrapper, path: string) { + return new Promise((resolve, reject) => + sftp.stat(path, (err, stats) => (err ? reject(err) : resolve(stats))), + ) +} + +function sftpReaddir(sftp: SFTPWrapper, path: string) { + return new Promise((resolve, reject) => + sftp.readdir(path, (err, list) => (err ? reject(err) : resolve(list))), + ) +} + +function sftpMkdir(sftp: SFTPWrapper, path: string) { + return new Promise((resolve) => + // tolerate "already exists" — the only failure mode we care about surfaces on write + sftp.mkdir(path, () => resolve()), + ) +} + +function sftpUnlink(sftp: SFTPWrapper, path: string) { + return new Promise((resolve, reject) => + sftp.unlink(path, (err) => (err ? reject(err) : resolve())), + ) +} + +function sftpRmdir(sftp: SFTPWrapper, path: string) { + return new Promise((resolve, reject) => + sftp.rmdir(path, (err) => (err ? reject(err) : resolve())), + ) +} + +interface WalkItem { + sourcePath: string + destPath: string + isDirectory: boolean + size: number +} + +/** Depth-first walk of a source item, producing the list of dirs+files to create/copy. */ +async function walk(sftp: SFTPWrapper, sourcePath: string, destPath: string): Promise { + const stats = await sftpStat(sftp, sourcePath) + if (stats.isDirectory()) { + const items: WalkItem[] = [{ sourcePath, destPath, isDirectory: true, size: 0 }] + const entries = await sftpReaddir(sftp, sourcePath) + for (const entry of entries) { + const childSource = `${sourcePath.replace(/\/$/, '')}/${entry.filename}` + const childDest = `${destPath.replace(/\/$/, '')}/${entry.filename}` + items.push(...(await walk(sftp, childSource, childDest))) + } + return items + } + return [{ sourcePath, destPath, isDirectory: false, size: stats.size }] +} + +function streamCopy( + source: SftpConnection, + dest: SftpConnection, + item: WalkItem, + transferId: string, + onChunk: (bytes: number) => void, +): Promise { + return new Promise((resolve, reject) => { + const readStream = source.sftp.createReadStream(item.sourcePath) + const writeStream = dest.sftp.createWriteStream(item.destPath) + let settled = false + const finish = (err?: Error) => { + if (settled) return + settled = true + if (err) { + readStream.destroy() + writeStream.destroy() + reject(err) + } else { + resolve() + } + } + readStream.on('data', (chunk: Buffer) => { + if (cancelRequested.has(transferId)) { + finish(new Error('Transfer cancelled')) + return + } + onChunk(chunk.length) + }) + readStream.on('error', finish) + writeStream.on('error', finish) + writeStream.on('close', () => finish()) + readStream.pipe(writeStream) + }) +} + +async function run(progress: TransferProgress) { + let source: SftpConnection | null = null + let dest: SftpConnection | null = null + try { + source = await openSftp(progress.sourceIntegrationId) + dest = await openSftp(progress.destIntegrationId) + + // Scan phase: enumerate everything and compute totals up front so the UI can show a real bar. + const allItems: WalkItem[] = [] + for (const sourcePath of progress.sourcePaths) { + const name = sourcePath.replace(/\/$/, '').split('/').filter(Boolean).pop() ?? sourcePath + const itemDest = `${progress.destPath.replace(/\/$/, '')}/${name}` + allItems.push(...(await walk(source.sftp, sourcePath, itemDest))) + } + const files = allItems.filter((i) => !i.isDirectory) + progress.totalFiles = files.length + progress.totalBytes = files.reduce((sum, f) => sum + f.size, 0) + + // Create the destination root + all directories first (depth-first order keeps parents before children). + await sftpMkdir(dest.sftp, progress.destPath) + for (const dir of allItems.filter((i) => i.isDirectory)) { + await sftpMkdir(dest.sftp, dir.destPath) + } + + // Copy files. + for (const file of files) { + if (cancelRequested.has(progress.transferId)) throw new Error('Transfer cancelled') + progress.currentFile = file.sourcePath + await streamCopy(source, dest, file, progress.transferId, (bytes) => { + progress.bytesTransferred += bytes + }) + progress.filesTransferred += 1 + } + + // On move, remove the source tree (files first, then dirs deepest-first). + if (progress.move) { + for (const file of files) await sftpUnlink(source.sftp, file.sourcePath) + const dirs = allItems.filter((i) => i.isDirectory).reverse() + for (const dir of dirs) await sftpRmdir(source.sftp, dir.sourcePath) + } + + progress.status = 'completed' + progress.currentFile = null + } catch (err) { + progress.status = cancelRequested.has(progress.transferId) ? 'cancelled' : 'failed' + progress.error = err instanceof Error ? err.message : 'Transfer failed' + } finally { + progress.finishedAt = Date.now() + cancelRequested.delete(progress.transferId) + closeSftp(source) + closeSftp(dest) + } +} + +export function startTransfer(req: { + sourceIntegrationId: number + destIntegrationId: number + sourcePaths: string[] + destPath: string + move?: boolean +}): string { + const transferId = randomUUID() + const progress: TransferProgress = { + transferId, + status: 'running', + sourceIntegrationId: req.sourceIntegrationId, + destIntegrationId: req.destIntegrationId, + sourcePaths: req.sourcePaths, + destPath: req.destPath, + move: req.move ?? false, + totalFiles: 0, + totalBytes: 0, + filesTransferred: 0, + bytesTransferred: 0, + currentFile: null, + error: null, + startedAt: Date.now(), + finishedAt: null, + } + activeTransfers.set(transferId, progress) + void run(progress) + return transferId +} + +export function getTransfer(transferId: string): TransferProgress | undefined { + return activeTransfers.get(transferId) +} + +export function listTransfers(): TransferProgress[] { + return Array.from(activeTransfers.values()).sort((a, b) => b.startedAt - a.startedAt) +} + +export function cancelTransfer(transferId: string): boolean { + const progress = activeTransfers.get(transferId) + if (!progress || progress.status !== 'running') return false + cancelRequested.add(transferId) + return true +} + +/** Drops finished transfers older than maxAgeMs so the map doesn't grow unbounded. */ +export function cleanupOldTransfers(maxAgeMs = 60 * 60 * 1000): void { + const now = Date.now() + for (const [id, progress] of activeTransfers.entries()) { + if (progress.status !== 'running' && progress.finishedAt && now - progress.finishedAt > maxAgeMs) { + activeTransfers.delete(id) + } + } +} diff --git a/src/lib/api.ts b/src/lib/api.ts index bfa403c..8c50d52 100644 --- a/src/lib/api.ts +++ b/src/lib/api.ts @@ -146,6 +146,12 @@ export const api = { }), getHostMetrics: (integrationId: number) => apiFetch(`/integrations/${integrationId}/metrics`), + + startTransfer: (data: { sourceIntegrationId: number; destIntegrationId: number; sourcePaths: string[]; destPath: string; move?: boolean }) => + apiFetch<{ transferId: string }>('/transfers', { method: 'POST', body: JSON.stringify(data) }), + listTransfers: () => apiFetch<{ transfers: TransferProgress[] }>('/transfers'), + getTransfer: (id: string) => apiFetch(`/transfers/${id}`), + cancelTransfer: (id: string) => apiFetch<{ ok: boolean }>(`/transfers/${id}/cancel`, { method: 'POST' }), } export interface AuthUser { @@ -244,6 +250,24 @@ export interface Resource { integration: string } +export interface TransferProgress { + transferId: string + status: 'running' | 'completed' | 'failed' | 'cancelled' + sourceIntegrationId: number + destIntegrationId: number + sourcePaths: string[] + destPath: string + move: boolean + totalFiles: number + totalBytes: number + filesTransferred: number + bytesTransferred: number + currentFile: string | null + error: string | null + startedAt: number + finishedAt: number | null +} + export interface HostMetrics { cpu: { percent: number | null; cores: number | null; load: [number, number, number] | null } memory: { percent: number | null; usedGiB: number | null; totalGiB: number | null } diff --git a/src/pages/Files.tsx b/src/pages/Files.tsx index 65b9ae4..05de482 100644 --- a/src/pages/Files.tsx +++ b/src/pages/Files.tsx @@ -11,8 +11,9 @@ import { Save, X, RefreshCw, + Send, } from 'lucide-react' -import { api, type FileEntry, type Integration } from '../lib/api' +import { api, type FileEntry, type Integration, type TransferProgress } from '../lib/api' const TEXT_PRIMARY = '#E8E6E0' const TEXT_SECONDARY = '#7A7D85' @@ -66,6 +67,12 @@ export default function Files() { const [editingEncoding, setEditingEncoding] = useState<'utf8' | 'base64'>('utf8') const [savingEdit, setSavingEdit] = useState(false) + const [transferEntry, setTransferEntry] = useState(null) + const [transferDestId, setTransferDestId] = useState('') + const [transferDestPath, setTransferDestPath] = useState('.') + const [transferMove, setTransferMove] = useState(false) + const [transfers, setTransfers] = useState([]) + const fileInputRef = useRef(null) useEffect(() => { @@ -182,6 +189,57 @@ export default function Files() { } } + function openTransfer(entry: FileEntry) { + const otherHost = hosts.find((h) => h.id !== integrationId) + setTransferEntry(entry) + setTransferDestId(otherHost ? otherHost.id : '') + setTransferDestPath('.') + setTransferMove(false) + } + + async function startTransfer() { + if (!integrationId || !transferEntry || !transferDestId) return + setError(null) + try { + await api.startTransfer({ + sourceIntegrationId: integrationId, + destIntegrationId: transferDestId, + sourcePaths: [joinPath(path, transferEntry.name)], + destPath: transferDestPath, + move: transferMove, + }) + setTransferEntry(null) + pollTransfers() + } catch (err) { + setError(err instanceof Error ? err.message : 'Failed to start transfer') + } + } + + function pollTransfers() { + api.listTransfers().then(({ transfers }) => setTransfers(transfers)).catch(() => {}) + } + + useEffect(() => { + pollTransfers() + const anyRunning = transfers.some((t) => t.status === 'running') + const interval = setInterval(pollTransfers, anyRunning ? 1000 : 5000) + return () => clearInterval(interval) + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [transfers.some((t) => t.status === 'running')]) + + async function handleCancelTransfer(id: string) { + try { + await api.cancelTransfer(id) + pollTransfers() + } catch { + // ignore — status poll will reflect reality + } + } + + function hostName(id: number): string { + return hosts.find((h) => h.id === id)?.name ?? `#${id}` + } + const breadcrumbs = path === '.' || path === '' ? [] : path.split('/').filter(Boolean) return ( @@ -295,6 +353,14 @@ export default function Files() { )} + @@ -316,6 +382,105 @@ export default function Files() { + {transfers.length > 0 && ( +
+
+ + Host-to-Host Transfers + + +
+ {transfers.map((t) => { + const pct = t.totalBytes > 0 ? Math.round((t.bytesTransferred / t.totalBytes) * 100) : t.status === 'completed' ? 100 : 0 + const statusColor = + t.status === 'completed' ? '#2ECC71' : t.status === 'failed' ? '#E74C3C' : t.status === 'cancelled' ? '#E67E22' : GOLD + return ( +
+
+ + {t.move ? 'Move' : 'Copy'} {hostName(t.sourceIntegrationId)} → {hostName(t.destIntegrationId)}: {t.destPath} + +
+ + {t.status === 'running' + ? `${t.filesTransferred}/${t.totalFiles} files · ${formatSize(t.bytesTransferred)}` + : t.status} + + {t.status === 'running' && ( + + )} +
+
+
+
+
+ {t.error && {t.error}} + {t.status === 'running' && t.currentFile && ( + {t.currentFile} + )} +
+ ) + })} +
+ )} + + {transferEntry && ( +
+
+
+ + Send "{transferEntry.name}" to another host + + +
+ + + + setTransferDestPath(e.target.value)} + placeholder="e.g. /home/user or ." + /> + +
+ + +
+
+
+ )} + {editingPath && (