Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ URL: https://processx.r-lib.org, https://github.com/r-lib/processx#readme
BugReports: https://github.com/r-lib/processx/issues
Depends: R (>= 3.4.0)
Imports:
ps (>= 1.2.0),
ps (>= 1.7.5.9001),
R6,
utils
Suggests:
Expand All @@ -36,6 +36,8 @@ Suggests:
testthat (>= 3.0.0),
webfakes,
withr
Remotes:
r-lib/ps@kill-tree-grace
Encoding: UTF-8
RoxygenNote: 7.2.3
Roxygen: list(markdown = TRUE)
Expand Down
5 changes: 5 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# processx (development version)

* Processes are now killed in parallel with a period of grace on
garbage collection (if `cleanup_tree` is `TRUE`) and on session
quit. The delay can be controlled with the new `cleanup_grace`
argument.

* The `grace` argument of the `kill()` method is now active on Unix
platforms. processx first tries to kill with `SIGTERM` with a
timeout of `grace` seconds. After the timeout, `SIGKILL` is sent as
Expand Down
44 changes: 44 additions & 0 deletions R/finalize.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
process_finalize <- function(private) {
ps <- process_cleanup_list(private)
ps::ps_kill_parallel(ps, private$cleanup_grace)
}

process_cleanup_list <- function(private) {
ps <- list()

if (private$cleanup) {
# Can't be created in advance because the ps finalizer might run first
handle <- ps::ps_handle(private$pid, as.POSIXct(private$starttime))
ps <- c(ps, list(handle))
}

if (private$cleanup_tree) {
ps <- c(ps, ps::ps_find_tree(private$tree_id))
}

ps
}

session_finalize <- function(node) {
ps <- list()
grace <- 0

while (!node_is_root(node)) {
private <- wref_key(node_value(node))
ps <- c(ps, process_cleanup_list(private))

if (!is.null(private$cleanup_grace)) {
grace <- max(grace, private$cleanup_grace)
}

node <- node_next(node)
}

ps::ps_kill_parallel(ps, grace)
}

wref_key <- function(x) .Call(c_processx__wref_key, x)
node_is_root <- function(x) is.null(node_next(x))
node_prev <- function(x) x[[1]]
node_next <- function(x) x[[2]]
node_value <- function(x) x[[3]]
5 changes: 4 additions & 1 deletion R/initialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ process_initialize <- function(self, private, command, args,
c_processx_exec,
command, c(command, args), pty, pty_options,
connections, env, windows_verbatim_args, windows_hide_window,
windows_detached_process, private, cleanup, cleanup_grace, wd, encoding,
windows_detached_process, private, cleanup, wd, encoding,
paste0("PROCESSX_", private$tree_id, "=YES")
)

Expand All @@ -155,6 +155,9 @@ process_initialize <- function(self, private, command, args,
chain_call(c_processx__proc_start_time, private$status)
if (private$starttime == 0) private$starttime <- Sys.time()

# Needed for cleaning up
private$pid <- self$get_pid()

## Need to close this, otherwise the child's end of the pipe
## will not be closed when the child exits, and then we cannot
## poll it.
Expand Down
6 changes: 1 addition & 5 deletions R/process.R
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,6 @@ process <- R6::R6Class(
#' collected. If requested so in the process constructor, then it
#' eliminates all processes in the process's subprocess tree.

finalize = function() {
if (!is.null(private$tree_id) && private$cleanup_tree &&
ps::ps_is_supported()) self$kill_tree(grace = private$cleanup_grace)
},

#' @description
#' Terminate the process. It also terminate all of its child
#' processes, except if they have created a new process group (on Unix),
Expand Down Expand Up @@ -671,6 +666,7 @@ process <- R6::R6Class(
windows_hide_window = NULL,

status = NULL, # C file handle
pid = NULL, # pid for cleanup

supervised = FALSE, # Whether process is tracked by supervisor

Expand Down
16 changes: 4 additions & 12 deletions man/process.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/Makevars
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ OBJECTS = init.o poll.o errors.o processx-connection.o \
processx-vector.o create-time.o base64.o \
unix/childlist.o unix/connection.o \
unix/processx.o unix/sigchld.o unix/utils.o \
unix/named_pipe.o cleancall.o
unix/named_pipe.o cleancall.o utils.o

.PHONY: all clean

Expand Down
4 changes: 3 additions & 1 deletion src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ SEXP run_testthat_tests(void);
SEXP processx__echo_on(void);
SEXP processx__echo_off(void);
SEXP processx__set_boot_time(SEXP);
SEXP processx__wref_key(SEXP);

static const R_CallMethodDef callMethods[] = {
CLEANCALL_METHOD_RECORD,
{ "processx_exec", (DL_FUNC) &processx_exec, 15 },
{ "processx_exec", (DL_FUNC) &processx_exec, 14 },
{ "processx_wait", (DL_FUNC) &processx_wait, 3 },
{ "processx_is_alive", (DL_FUNC) &processx_is_alive, 2 },
{ "processx_get_exit_status", (DL_FUNC) &processx_get_exit_status, 2 },
Expand All @@ -33,6 +34,7 @@ static const R_CallMethodDef callMethods[] = {
{ "processx_write_named_pipe", (DL_FUNC) &processx_write_named_pipe, 2 },
{ "processx__proc_start_time", (DL_FUNC) &processx__proc_start_time, 1 },
{ "processx__set_boot_time", (DL_FUNC) &processx__set_boot_time, 1 },
{ "processx__wref_key", (DL_FUNC) &processx__wref_key, 1 },

{ "processx_connection_create", (DL_FUNC) &processx_connection_create, 2 },
{ "processx_connection_read_chars", (DL_FUNC) &processx_connection_read_chars, 2 },
Expand Down
10 changes: 8 additions & 2 deletions src/processx.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ extern "C" {
SEXP processx_exec(SEXP command, SEXP args, SEXP pty, SEXP pty_options,
SEXP connections, SEXP env, SEXP windows_verbatim_args,
SEXP windows_hide_window, SEXP windows_detached_process,
SEXP private_, SEXP cleanup, SEXP cleanup_signal,
SEXP wd, SEXP encoding, SEXP tree_id);
SEXP private_, SEXP cleanup, SEXP wd, SEXP encoding,
SEXP tree_id);
SEXP processx_wait(SEXP status, SEXP timeout, SEXP name);
SEXP processx_is_alive(SEXP status, SEXP name);
SEXP processx_get_exit_status(SEXP status, SEXP name);
Expand Down Expand Up @@ -117,4 +117,10 @@ typedef struct {
}
#endif

#define r_no_return __attribute__ ((noreturn))

r_no_return void r_unwind(SEXP x);
SEXP r_unwind_protect(void (*fn)(void *data), void *data);
SEXP r_safe_eval(SEXP expr, SEXP env, SEXP *out);

#endif
4 changes: 2 additions & 2 deletions src/unix/processx-unix.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ typedef struct processx_handle_s {
int fd2; /* readable */
int waitpipe[2]; /* use it for wait() with timeout */
int cleanup;
int cleanup_signal;
double cleanup_grace;
SEXP r6_private;
double create_time;
SEXP finalizer_node;
processx_connection_t *pipes[3];
int ptyfd;
} processx_handle_t;
Expand Down
119 changes: 101 additions & 18 deletions src/unix/processx.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ static void processx__child_init(processx_handle_t *handle, SEXP connections,
processx_options_t *options,
const char *tree_id);

static SEXP processx__make_handle(SEXP private, int cleanup, double cleanup_grace);
static SEXP processx__make_handle(SEXP private, int cleanup);
static void processx__handle_destroy(processx_handle_t *handle);
void processx__create_connections(processx_handle_t *handle, SEXP private,
const char *encoding);
Expand Down Expand Up @@ -336,6 +336,40 @@ SEXP c_processx_kill_data(void *payload) {
return R_NilValue;
}

static
SEXP finalizer_call(SEXP private) {
static SEXP finalize_fn = NULL;
if (!finalize_fn) {
finalize_fn = lang3(install(":::"),
install("processx"),
install("process_finalize"));
R_PreserveObject(finalize_fn);
}

return lang2(finalize_fn, private);
}

static SEXP session_finalizer_list = NULL;

// These need to be macros to be considered protectors by rchk
#define node_poke_prev(NODE, PREV) SET_VECTOR_ELT((NODE), 0, (PREV))
#define node_poke_next(NODE, NEXT) SET_VECTOR_ELT((NODE), 1, (NEXT))
#define node_poke_value(NODE, VALUE) SET_VECTOR_ELT((NODE), 2, (VALUE))

static SEXP node_prev(SEXP node) { return VECTOR_ELT(node, 0); }
static SEXP node_next(SEXP node) { return VECTOR_ELT(node, 1); }

static
SEXP new_node(SEXP prev, SEXP next, SEXP value) {
SEXP out = allocVector(VECSXP, 3);

node_poke_prev(out, prev);
node_poke_next(out, next);
node_poke_value(out, value);

return out;
}

void processx__finalizer(SEXP status) {
processx_handle_t *handle = (processx_handle_t*) R_ExternalPtrAddr(status);

Expand All @@ -348,14 +382,21 @@ void processx__finalizer(SEXP status) {
if (!handle)
return;

// FIXME: Do we need cleancall here?
if (handle->cleanup) {
struct cleanup_kill_data data = {
.status = status,
.grace = handle->cleanup_grace,
.name = R_NilValue
};
r_with_cleanup_context(c_processx_kill_data, &data);
SEXP call = PROTECT(finalizer_call(handle->r6_private));
SEXP err = r_safe_eval(call, R_BaseEnv, NULL);
UNPROTECT(1);

/* Remove node from session finalizer list */
SEXP node = handle->finalizer_node;
SEXP prev = node_prev(node);
SEXP next = node_next(node);

if (prev != R_NilValue) {
node_poke_next(prev, next);
}
node_poke_prev(next, prev);
if (node == session_finalizer_list) {
session_finalizer_list = next;
}

/* Note: if no cleanup is requested, then we still have a sigchld
Expand All @@ -365,21 +406,64 @@ void processx__finalizer(SEXP status) {
/* Deallocate memory */
R_ClearExternalPtr(status);
processx__handle_destroy(handle);

if (err) {
r_unwind(err);
}
}

static
void processx__session_finalizer(SEXP _) {
static SEXP finalize_fn = NULL;
if (!finalize_fn) {
finalize_fn = lang3(install(":::"),
install("processx"),
install("session_finalize"));
R_PreserveObject(finalize_fn);
}

SEXP call = PROTECT(lang2(finalize_fn, session_finalizer_list));
eval(call, R_BaseEnv);
UNPROTECT(1);
}

static SEXP processx__make_handle(SEXP private, int cleanup, double cleanup_grace) {
static
void processx__register_finalizer(SEXP status, processx_handle_t *handle) {
if (!session_finalizer_list) {
// This root node is never popped and protects the rest of the list
session_finalizer_list = new_node(R_NilValue, R_NilValue, R_NilValue);
R_PreserveObject(session_finalizer_list);
R_RegisterCFinalizerEx(R_BaseEnv, &processx__session_finalizer, 1);
}

// GC finalizer
R_RegisterCFinalizerEx(status, &processx__finalizer, 0);

// Session finalizer
SEXP private_weakref = R_MakeWeakRef(handle->r6_private, R_NilValue, R_NilValue, 0);
PROTECT(private_weakref);

SEXP node = new_node(R_NilValue, session_finalizer_list, private_weakref);
node_poke_prev(session_finalizer_list, node);
session_finalizer_list = node;
handle->finalizer_node = node;

UNPROTECT(1);
}

static SEXP processx__make_handle(SEXP private, int cleanup) {
processx_handle_t * handle;
SEXP result;

handle = (processx_handle_t*) malloc(sizeof(processx_handle_t));
if (!handle) { R_THROW_ERROR("Cannot make processx handle, out of memory"); }
memset(handle, 0, sizeof(processx_handle_t));

handle->waitpipe[0] = handle->waitpipe[1] = -1;
handle->r6_private = private;

result = PROTECT(R_MakeExternalPtr(handle, private, R_NilValue));
R_RegisterCFinalizerEx(result, processx__finalizer, 1);
handle->cleanup = cleanup;
handle->cleanup_grace = cleanup_grace;
processx__register_finalizer(result, handle);

UNPROTECT(1);
return result;
Expand Down Expand Up @@ -426,14 +510,12 @@ void processx__make_socketpair(int pipe[2], const char *exe) {
SEXP processx_exec(SEXP command, SEXP args, SEXP pty, SEXP pty_options,
SEXP connections, SEXP env, SEXP windows_verbatim_args,
SEXP windows_hide_window, SEXP windows_detached_process,
SEXP private, SEXP cleanup, SEXP cleanup_grace, SEXP wd,
SEXP encoding, SEXP tree_id) {
SEXP private, SEXP cleanup, SEXP wd, SEXP encoding,
SEXP tree_id) {

char *ccommand = processx__tmp_string(command, 0);
char **cargs = processx__tmp_character(args);
char **cenv = isNull(env) ? 0 : processx__tmp_character(env);
int ccleanup = INTEGER(cleanup)[0];
double ccleanup_grace = REAL(cleanup_grace)[0];

const int cpty = LOGICAL(pty)[0];
const char *cencoding = CHAR(STRING_ELT(encoding, 0));
Expand Down Expand Up @@ -468,7 +550,8 @@ SEXP processx_exec(SEXP command, SEXP args, SEXP pty, SEXP pty_options,

processx__setup_sigchld();

result = PROTECT(processx__make_handle(private, ccleanup, ccleanup_grace));
int ccleanup = LOGICAL(cleanup)[0];
result = PROTECT(processx__make_handle(private, ccleanup));
handle = R_ExternalPtrAddr(result);

if (cpty) {
Expand Down
Loading