Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: create & delete sync sessions over gRPC #119

Merged
merged 6 commits into from
Mar 28, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -21,4 +21,8 @@ final class PreviewFileSync: FileSyncDaemon {
func createSession(localPath _: String, agentHost _: String, remotePath _: String) async throws(DaemonError) {}

func deleteSessions(ids _: [String]) async throws(VPNLib.DaemonError) {}

func pauseSessions(ids _: [String]) async throws(VPNLib.DaemonError) {}

func resumeSessions(ids _: [String]) async throws(VPNLib.DaemonError) {}
}
18 changes: 16 additions & 2 deletions Coder-Desktop/Coder-Desktop/Views/FileSync/FileSyncConfig.swift
Original file line number Diff line number Diff line change
@@ -51,11 +51,15 @@ struct FileSyncConfig<VPN: VPNService, FS: FileSyncDaemon>: View {
loading = true
defer { loading = false }
do throws(DaemonError) {
// TODO: Support selecting & deleting multiple sessions at once
try await fileSync.deleteSessions(ids: [selection!])
if fileSync.sessionState.isEmpty {
// Last session was deleted, stop the daemon
await fileSync.stop()
}
} catch {
deleteError = error
}
await fileSync.refreshSessions()
selection = nil
}
} label: {
@@ -65,7 +69,17 @@ struct FileSyncConfig<VPN: VPNService, FS: FileSyncDaemon>: View {
if let selectedSession = fileSync.sessionState.first(where: { $0.id == selection }) {
Divider()
Button {
// TODO: Pause & Unpause
Task {
// TODO: Support pausing & resuming multiple sessions at once
loading = true
defer { loading = false }
switch selectedSession.status {
case .paused:
try await fileSync.resumeSessions(ids: [selectedSession.id])
default:
try await fileSync.pauseSessions(ids: [selectedSession.id])
}
}
} label: {
switch selectedSession.status {
case .paused:
Original file line number Diff line number Diff line change
@@ -68,7 +68,7 @@ struct FileSyncSessionModal<VPN: VPNService, FS: FileSyncDaemon>: View {
}.disabled(loading)
.alert("Error", isPresented: Binding(
get: { createError != nil },
set: { if $0 { createError = nil } }
set: { if !$0 { createError = nil } }
)) {} message: {
Text(createError?.description ?? "An unknown error occurred.")
}
@@ -83,7 +83,6 @@ struct FileSyncSessionModal<VPN: VPNService, FS: FileSyncDaemon>: View {
defer { loading = false }
do throws(DaemonError) {
if let existingSession {
// TODO: Support selecting & deleting multiple sessions at once
try await fileSync.deleteSessions(ids: [existingSession.id])
}
try await fileSync.createSession(
4 changes: 4 additions & 0 deletions Coder-Desktop/Coder-DesktopTests/Util.swift
Original file line number Diff line number Diff line change
@@ -48,6 +48,10 @@ class MockFileSyncDaemon: FileSyncDaemon {
}

func createSession(localPath _: String, agentHost _: String, remotePath _: String) async throws(DaemonError) {}

func pauseSessions(ids _: [String]) async throws(VPNLib.DaemonError) {}

func resumeSessions(ids _: [String]) async throws(VPNLib.DaemonError) {}
}

extension Inspection: @unchecked Sendable, @retroactive InspectionEmissary {}
56 changes: 10 additions & 46 deletions Coder-Desktop/VPNLib/FileSync/FileSyncDaemon.swift
Original file line number Diff line number Diff line change
@@ -15,6 +15,8 @@ public protocol FileSyncDaemon: ObservableObject {
func refreshSessions() async
func createSession(localPath: String, agentHost: String, remotePath: String) async throws(DaemonError)
func deleteSessions(ids: [String]) async throws(DaemonError)
func pauseSessions(ids: [String]) async throws(DaemonError)
func resumeSessions(ids: [String]) async throws(DaemonError)
}

@MainActor
@@ -41,6 +43,9 @@ public class MutagenDaemon: FileSyncDaemon {
private let mutagenDataDirectory: URL
private let mutagenDaemonSocket: URL

// Managing sync sessions could take a while, especially with prompting
let sessionMgmtReqTimeout: TimeAmount = .seconds(15)

// Non-nil when the daemon is running
var client: DaemonClient?
private var group: MultiThreadedEventLoopGroup?
@@ -75,6 +80,10 @@ public class MutagenDaemon: FileSyncDaemon {
return
}
await refreshSessions()
if sessionState.isEmpty {
logger.info("No sync sessions found on startup, stopping daemon")
await stop()
}
}
}

@@ -162,7 +171,7 @@ public class MutagenDaemon: FileSyncDaemon {
// Already connected
return
}
group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
group = MultiThreadedEventLoopGroup(numberOfThreads: 2)
Copy link
Member Author

@ethanndickson ethanndickson Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always have two outgoing gRPC requests at once, the prompter, and the actual request we're trying to make. This creates a second OS thread for gRPC to use.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh 2 os threads might be overkill

do {
channel = try GRPCChannelPool.with(
target: .unixDomainSocket(mutagenDaemonSocket.path),
@@ -252,51 +261,6 @@ public class MutagenDaemon: FileSyncDaemon {
logger.info("\(line, privacy: .public)")
}
}

public func refreshSessions() async {
guard case .running = state else { return }
// TODO: Implement
}

public func createSession(
localPath _: String,
agentHost _: String,
remotePath _: String
) async throws(DaemonError) {
if case .stopped = state {
do throws(DaemonError) {
try await start()
} catch {
state = .failed(error)
throw error
}
}
// TODO: Add session
}

public func deleteSessions(ids _: [String]) async throws(DaemonError) {
// TODO: Delete session
await stopIfNoSessions()
}

private func stopIfNoSessions() async {
let sessions: Synchronization_ListResponse
do {
sessions = try await client!.sync.list(Synchronization_ListRequest.with { req in
req.selection = .with { selection in
selection.all = true
}
})
} catch {
state = .failed(.daemonStartFailure(error))
return
}
// If there's no configured sessions, the daemon doesn't need to be running
if sessions.sessionStates.isEmpty {
logger.info("No sync sessions found")
await stop()
}
}
}

struct DaemonClient {
120 changes: 120 additions & 0 deletions Coder-Desktop/VPNLib/FileSync/FileSyncManagement.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import NIOCore

public extension MutagenDaemon {
func refreshSessions() async {
guard case .running = state else { return }
let sessions: Synchronization_ListResponse
do {
sessions = try await client!.sync.list(Synchronization_ListRequest.with { req in
req.selection = .with { selection in
selection.all = true
}
})
} catch {
state = .failed(.grpcFailure(error))
return
}
sessionState = sessions.sessionStates.map { FileSyncSession(state: $0) }
}

func createSession(
localPath: String,
agentHost: String,
remotePath: String
) async throws(DaemonError) {
if case .stopped = state {
do throws(DaemonError) {
try await start()
} catch {
state = .failed(error)
throw error
}
}
let (stream, promptID) = try await host()
defer { stream.cancel() }
let req = Synchronization_CreateRequest.with { req in
req.prompter = promptID
req.specification = .with { spec in
spec.alpha = .with { alpha in
alpha.protocol = .local
alpha.path = localPath
}
spec.beta = .with { beta in
beta.protocol = .ssh
beta.host = agentHost
beta.path = remotePath
}
// TODO: Ingest a config from somewhere
spec.configuration = Synchronization_Configuration()
spec.configurationAlpha = Synchronization_Configuration()
spec.configurationBeta = Synchronization_Configuration()
}
}
do {
// The first creation will need to transfer the agent binary
// TODO: Because this is pretty long, we should show progress updates
// using the prompter messages
_ = try await client!.sync.create(req, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout * 4)))
} catch {
throw .grpcFailure(error)
}
await refreshSessions()
}

func deleteSessions(ids: [String]) async throws(DaemonError) {
// Terminating sessions does not require prompting, according to the
// Mutagen CLI
let (stream, promptID) = try await host(allowPrompts: false)
defer { stream.cancel() }
guard case .running = state else { return }
do {
_ = try await client!.sync.terminate(Synchronization_TerminateRequest.with { req in
req.prompter = promptID
req.selection = .with { selection in
selection.specifications = ids
}
}, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout)))
} catch {
throw .grpcFailure(error)
}
await refreshSessions()
}

func pauseSessions(ids: [String]) async throws(DaemonError) {
// Pausing sessions does not require prompting, according to the
// Mutagen CLI
let (stream, promptID) = try await host(allowPrompts: false)
defer { stream.cancel() }
guard case .running = state else { return }
do {
_ = try await client!.sync.pause(Synchronization_PauseRequest.with { req in
req.prompter = promptID
req.selection = .with { selection in
selection.specifications = ids
}
}, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout)))
} catch {
throw .grpcFailure(error)
}
await refreshSessions()
}

func resumeSessions(ids: [String]) async throws(DaemonError) {
// Resuming sessions does not require prompting, according to the
// Mutagen CLI
let (stream, promptID) = try await host(allowPrompts: false)
defer { stream.cancel() }
guard case .running = state else { return }
do {
_ = try await client!.sync.resume(Synchronization_ResumeRequest.with { req in
req.prompter = promptID
req.selection = .with { selection in
selection.specifications = ids
}
}, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout)))
} catch {
throw .grpcFailure(error)
}
await refreshSessions()
}
}