Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ui): implement cancellation for streaming commands like up #859

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 24 additions & 17 deletions desktop/src/client/command.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ChildProcess, Command as ShellCommand, EventEmitter } from "@tauri-apps/api/shell"
import { ChildProcess, Command as ShellCommand, EventEmitter, Child } from "@tauri-apps/api/shell"
import { debug, isError } from "../lib"
import { Result, ResultError, Return } from "../lib/result"
import { DEVPOD_BINARY, DEVPOD_FLAG_OPTION, DEVPOD_UI_ENV_VAR } from "./constants"
Expand All @@ -12,10 +12,12 @@ export type TEventListener<TEventName extends string> = Parameters<
export type TCommand<T> = {
run(): Promise<Result<T>>
stream(listener: TStreamEventListenerFn): Promise<ResultError>
cancel(): Promise<ResultError>
}

export class Command implements TCommand<ChildProcess> {
private sidecarCommand
private sidecarCommand: ShellCommand
private childProcess?: Child
private args: string[]

constructor(args: string[]) {
Expand All @@ -26,20 +28,6 @@ export class Command implements TCommand<ChildProcess> {
this.args = args
}

public withConversion<T>(convert: (childProcess: ChildProcess) => Result<T>): TCommand<T> {
return {
run: async () => {
const result = await this.run()
if (result.err) {
return result
}

return convert(result.val)
},
stream: this.stream,
}
}

public async run(): Promise<Result<ChildProcess>> {
try {
const rawResult = await this.sidecarCommand.execute()
Expand All @@ -53,7 +41,7 @@ export class Command implements TCommand<ChildProcess> {

public async stream(listener: TStreamEventListenerFn): Promise<ResultError> {
try {
await this.sidecarCommand.spawn()
this.childProcess = await this.sidecarCommand.spawn()
await new Promise((res, rej) => {
const stdoutListener: TEventListener<"data"> = (message) => {
try {
Expand Down Expand Up @@ -87,6 +75,7 @@ export class Command implements TCommand<ChildProcess> {
const cleanup = () => {
this.sidecarCommand.stderr.removeListener("data", stderrListener)
this.sidecarCommand.stdout.removeListener("data", stdoutListener)
this.childProcess = undefined
}

this.sidecarCommand.on("close", (arg?: { code: number }) => {
Expand Down Expand Up @@ -114,6 +103,24 @@ export class Command implements TCommand<ChildProcess> {
return Return.Failed("streaming failed")
}
}

/**
* Cancel the command.
* Only works if it has been created with the `stream` method.
*/
public async cancel(): Promise<Result<undefined>> {
try {
await this.childProcess?.kill()

return Return.Ok()
} catch (e) {
if (isError(e)) {
return Return.Failed(e.message)
}

return Return.Failed("failed to cancel command")
}
}
}

export function isOk(result: ChildProcess): boolean {
Expand Down
19 changes: 18 additions & 1 deletion desktop/src/client/commandCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type TCommandCacheID = `${string}:${TActionName}`
type TCommandHandler = Readonly<{
promise: Promise<ResultError>
stream?: (streamHandler?: THandler<TStreamEventListenerFn>) => TUnsubscribeFn
cancel?: () => Promise<ResultError>
}>
type TCommandCacheStore = Map<TCommandCacheID, TCommandHandler>

Expand All @@ -25,6 +26,18 @@ export class CommandCache {
return this.store.get(cacheID)
}

public findCommandHandlerById(id: string) {
for (const [cacheID, handler] of this.store) {
const [actionID] = cacheID.split(":")

if (actionID === id) {
return handler
}
}

return undefined
}

public clear(info: TCommandCacheInfo) {
const cacheID = this.getCacheID(info)
this.store.delete(cacheID)
Expand All @@ -50,7 +63,7 @@ export class CommandCache {
return noop
}

// Replay events in-order before registering the new newHandler
// events in-order before registering the new newHandler
if (!isEmpty(events)) {
for (const event of events) {
handler.notify(event)
Expand All @@ -64,11 +77,15 @@ export class CommandCache {

return eventManager.subscribe(handler)
}
const cancel: TCommandHandler["cancel"] = () => {
return cmd.cancel()
}

const cacheID = this.getCacheID(info)
this.store.set(cacheID, {
promise,
stream,
cancel,
})

return { operation: promise, stream }
Expand Down
6 changes: 6 additions & 0 deletions desktop/src/client/workspaces/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@ export class WorkspacesClient implements TDebuggable {
return unsubscribe
}

public async cancelAction(actionID: TActionID): Promise<ResultError> {
const cmdHandler = this.commandCache.findCommandHandlerById(actionID)

return cmdHandler?.cancel?.() ?? Return.Ok()
}

public syncActionLogs(actionIDs: readonly string[]) {
invoke("sync_action_logs", { actions: actionIDs })
}
Expand Down
12 changes: 12 additions & 0 deletions desktop/src/contexts/DevPodContext/action/useAction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import { TActionID, TActionObj } from "./action"
type TActionResult = Readonly<{
data: TActionObj
connectOrReplay(onStream: TStreamEventListenerFn): void | VoidFunction
cancel(): void
}>

export function useAction(actionID: TActionID | undefined): TActionResult | undefined {
const isCancellingRef = useRef(false)
const viewID = useId()
const data = useSyncExternalStore(
useCallback((listener) => devPodStore.subscribe(listener), []),
Expand Down Expand Up @@ -39,6 +41,16 @@ export function useAction(actionID: TActionID | undefined): TActionResult | unde

return replay(data.id, onStream)
},
cancel: () => {
if (isCancellingRef.current) {
return
}
isCancellingRef.current = true
// could improve by setting timeout as fallback if promise doesn't resolve, let's see if this is enough
client.workspaces.cancelAction(data.targetID).finally(() => {
isCancellingRef.current = false
})
},
}
}, [data, connect, replay])
}
Expand Down
24 changes: 19 additions & 5 deletions desktop/src/views/Actions/Action.tsx
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { Box } from "@chakra-ui/react"
import { Box, Button } from "@chakra-ui/react"
import { useEffect, useMemo } from "react"
import { HiStop } from "react-icons/hi2"
import { useNavigate } from "react-router"
import { useParams, useSearchParams } from "react-router-dom"
import { useStreamingTerminal } from "../../components"
import { ToolbarActions, useStreamingTerminal } from "../../components"
import { useAction } from "../../contexts"
import { Routes } from "../../routes"

Expand Down Expand Up @@ -32,8 +33,21 @@ export function Action() {
}, [searchParams, action, navigate])

return (
<Box height="calc(100% - 3rem)" width="full">
{terminal}
</Box>
<>
<ToolbarActions>
{action?.data.status === "pending" && (
<Button
variant="outline"
aria-label="Cancel action"
leftIcon={<HiStop />}
onClick={() => action.cancel()}>
Cancel
</Button>
)}
</ToolbarActions>
<Box height="calc(100% - 3rem)" width="full">
{terminal}
</Box>
</>
)
}
Loading