| Title: | Tools for Message Passing Between Processes |
|---|---|
| Description: | Provides tools for passing messages between R processes. Shiny examples are provided showing how to perform useful tasks such as: updating reactive values from within a future, progress bars for long running async tasks, and interrupting async tasks based on user input. |
| Authors: | Ian E. Fellows |
| Maintainer: | Ian E. Fellows <[email protected]> |
| License: | MIT + file LICENCE |
| Version: | 0.1.5 |
| Built: | 2026-05-28 06:02:58 UTC |
| Source: | https://github.com/fellstat/ipc |
Provides tools for passing messages between R processes. Shiny Examples are provided showing how to perform useful tasks such as: updating reactive values from within a future, progress bars for long running async tasks, and interrupting async tasks based on user input.
Ian Fellows [email protected]
An interruptor useful for stopping child processes.
An interruptor useful for stopping child processes.
This class is a simple wrapper around a Queue object making adding interrupt checking to future code easy to implement and read.
Methods
initialize(queue=shinyQueue())Creates a new interruptor.
interrupt(msg="Signaled Interrupt")Signals an interrupt
execInterrupts()Executes anything pushed to the queue, including interrupts.
getInterrupts()Gets the result of the queue's executing, not throwing the interrupts.
new()
Create the object
AsyncInterruptor$new(queue = shinyQueue())
queueThe underlying queue object to use for interruption
interrupt()
signal an error
AsyncInterruptor$interrupt(msg = "Signaled Interrupt")
msgThe error message
execInterrupts()
Execute any interruptions that have been signaled
AsyncInterruptor$execInterrupts()
getInterrupts()
Get any interruptions that have been signaled without throwing them as errors
AsyncInterruptor$getInterrupts()
destroy()
Cleans up object after use
AsyncInterruptor$destroy()
clone()
The objects of this class are cloneable with this method.
AsyncInterruptor$clone(deep = FALSE)
deepWhether to make a deep clone.
library(future) strategy <- "future::multisession" plan(strategy) inter <- AsyncInterruptor$new() fut <- future({ for(i in 1:100){ Sys.sleep(.01) inter$execInterrupts() } }) inter$interrupt("Error: Stop Future") try(value(fut)) inter$destroy() # Clean up multisession cluster plan(sequential)library(future) strategy <- "future::multisession" plan(strategy) inter <- AsyncInterruptor$new() fut <- future({ for(i in 1:100){ Sys.sleep(.01) inter$execInterrupts() } }) inter$interrupt("Error: Stop Future") try(value(fut)) inter$destroy() # Clean up multisession cluster plan(sequential)
A progress bar object where inc and set are usable within other processes
A progress bar object where inc and set are usable within other processes
An async compatible wrapper around Shiny's progress bar. It should be instatiated from the main process, but may be closed, set and incremented from any process.
new()
Creates a new progress panel and displays it.
AsyncProgress$new( ..., queue = shinyQueue(), millis = 250, value = NULL, message = NULL, detail = NULL )
...Additional parameters to be passed to Shiny::Progress
queueA Queue object for message passing
millisHow often in milliseconds should updates to the progress bar be checked for.
valueA numeric value at which to set
the progress bar, relative to min and max.
messageA single-element character vector; the message to be
displayed to the user, or NULL to hide the current message
(if any).
detailA single-element character vector; the detail message
to be displayed to the user, or NULL to hide the current
detail message (if any). The detail message will be shown with a
de-emphasized appearance relative to message.
getMax()
Returns the maximum
AsyncProgress$getMax()
getMin()
Returns the minimum
AsyncProgress$getMin()
sequentialClose()
Removes the progress panel and destroys the queue. Must be called from main process.
AsyncProgress$sequentialClose()
set()
Updates the progress panel. When called the first time, the progress panel is displayed.
AsyncProgress$set(value = NULL, message = NULL, detail = NULL)
valueA numeric value at which to set
messageA single-element character vector; the message to be
displayed to the user, or NULL to hide the current message
(if any).
detailA single-element character vector; the detail message
to be displayed to the user, or NULL to hide the current
detail message (if any). The detail message will be shown with a
de-emphasized appearance relative to message.
inc()
Like set, this updates the progress panel. The difference is
that inc increases the progress bar by amount, instead
of setting it to a specific value.
AsyncProgress$inc(amount = 0.1, message = NULL, detail = NULL)
amountthe size of the increment.
messageA single-element character vector; the message to be
displayed to the user, or NULL to hide the current message
(if any).
detailA single-element character vector; the detail message
to be displayed to the user, or NULL to hide the current
detail message (if any). The detail message will be shown with a
de-emphasized appearance relative to message.
close()
Fires a close signal and may be used from any process.
AsyncProgress$close()
clone()
The objects of this class are cloneable with this method.
AsyncProgress$clone(deep = FALSE)
deepWhether to make a deep clone.
## Only run examples in interactive R sessions if (interactive()) { library(shiny) library(future) plan(multisession) ui <- fluidPage( actionButton("run","Run"), tableOutput("dataset") ) server <- function(input, output, session) { dat <- reactiveVal() observeEvent(input$run, { progress <- AsyncProgress$new(session, min=1, max=15) future({ for (i in 1:15) { progress$set(value = i) Sys.sleep(0.5) } progress$close() cars }) %...>% dat NULL #return something other than the future so the UI is not blocked }) output$dataset <- renderTable({ req(dat()) }) } shinyApp(ui, server) }## Only run examples in interactive R sessions if (interactive()) { library(shiny) library(future) plan(multisession) ui <- fluidPage( actionButton("run","Run"), tableOutput("dataset") ) server <- function(input, output, session) { dat <- reactiveVal() observeEvent(input$run, { progress <- AsyncProgress$new(session, min=1, max=15) future({ for (i in 1:15) { progress$set(value = i) Sys.sleep(0.5) } progress$close() cars }) %...>% dat NULL #return something other than the future so the UI is not blocked }) output$dataset <- renderTable({ req(dat()) }) } shinyApp(ui, server) }
A Class for reading and executing tasks from a source
A Class for reading and executing tasks from a source
handlersA list of handlers
stoppedIs currently stopped.
laterHandleA callback handle.
new()
Creates the object.
Consumer$new(source)
sourceA source, e.g. TextFileSource.
setSource()
Sets the source.
Consumer$setSource(source)
sourceA source, e.g. TextFileSource.
getSource()
Gets the source.
Consumer$getSource()
consume()
Executes all (unprocessed) signals fired to source from a Producer.
if throwErrors is TRUE, the first error encountered is thrown
after executing all signals. Signals are executed in the env environment.
If env is NULL, the environment set at initialization is used.
Consumer$consume(throwErrors = TRUE, env = parent.frame())
throwErrorsShould errors be thrown or caught.
envThe execution environment.
start()
Starts executing consume every millis milliseconds. throwErrors
and env are passed down to consume
Consumer$start(millis = 250, env = parent.frame())
millismilliseconds.
envThe execution environment.
stop()
Stops the periodic execution of consume.
Consumer$stop()
addHandler()
Adds a handler for 'signal'. func
Consumer$addHandler(func, signal)
funcThe function which takes three parameters: 1. the signal, 2. the message object, and 3. the evaluation environment.
signalA string to bind the function to.
clearHandlers()
Removes all handler.s
Consumer$clearHandlers()
removeHandler()
Removes a single handler.
Consumer$removeHandler(signal, index)
signalThe signal of the handler.
indexThe index of the handler to remove from the signal.
initHandlers()
Adds default handlers.
Consumer$initHandlers()
finalize()
cleans up object.
Consumer$finalize()
clone()
The objects of this class are cloneable with this method.
Consumer$clone(deep = FALSE)
deepWhether to make a deep clone.
Get/set the class used to sink/read from the file system
defaultSource(sourceClass)defaultSource(sourceClass)
sourceClass |
An R6 object |
A Class for sending signals to a source
A Class for sending signals to a source
new()
Creates a Producer object linked to the source.
Producer$new(source)
sourceA source.
setSource()
Setter for source.
Producer$setSource(source)
sourceA source.
getSource()
Getter for source.
Producer$getSource()
fire()
Sends a signal to the source with associates object obj.
Producer$fire(signal, obj = NA)
signalA string signal to send.
objThe object to associate with the signal.
fireEval()
Signals for execution of the expression obj with values from
the environment (or list) env substituted in.
Producer$fireEval(expr, env)
exprAn expression to evaluate.
envAn environment or list for substitution
fireDoCall()
Signals for execution of the function whose string value is name
with the parameters in list param.
Producer$fireDoCall(name, param)
namethe name of the function
paramA list of function parameters.
fireCall()
Signals for execution of the function whose string value is name
with the parameters ....
Producer$fireCall(name, ...)
namethe name of the function
...The arguments to the function.
clone()
The objects of this class are cloneable with this method.
Producer$clone(deep = FALSE)
deepWhether to make a deep clone.
Creates a Queue object for inter-process communication.
Its members producer and consumer are the main entry points for
sending and receiving messages respectively.
queue( source = defaultSource()$new(), producer = Producer$new(source), consumer = Consumer$new(source) )queue( source = defaultSource()$new(), producer = Producer$new(source), consumer = Consumer$new(source) )
source |
The source for reading and writing the queue |
producer |
The producer for the source |
consumer |
The consumer of the source |
This function creates a queue object for communication between different R processes,
including forks of the same process. By default, it uses txtq backage as its backend.
Technically, the information is sent through temporary files, created in a new directory
inside the session-specific temporary folder (see tempfile).
This requires that the new directory is writeable, this is normally the case but
if Sys.umask forbids writing, the communication fails with an error.
producerA Producer object
consumera Consumer object.
new()
Create a Queue object
Queue$new(source, prod, cons)
sourceThe source to use for communication.
prodA Producer object.
consA Consumer object.
destroy()
clean up object after use.
Queue$destroy()
clone()
The objects of this class are cloneable with this method.
Queue$clone(deep = FALSE)
deepWhether to make a deep clone.
## Not run: library(parallel) library(future) library(promises) plan(multisession) q <- queue() # communicate from main session to child fut <- future({ for(i in 1:1000){ Sys.sleep(.1) q$consumer$consume() } }) q$producer$fireEval(stop("Stop that child")) cat(try(value(fut))) # Communicate from child to main session j <- 0 fut <- future({ for(i in 1:10){ Sys.sleep(.2) # set j in the main thread substituting i into the expression q$producer$fireEval(j <- i, env=list(i=i)) } }) while(j < 10){ q$consumer$consume() # collect and execute assignments cat("j = ", j, "\n") Sys.sleep(.1) } fut <- future({ for(i in 1:10){ Sys.sleep(.2) # set j in the main thread substituting i into the expression q$producer$fireEval(print(i), env=list(i=i)) } }) q$consumer$start() # execute `comsume` at regular intervals # clean up q$destroy() ## End(Not run)## Not run: library(parallel) library(future) library(promises) plan(multisession) q <- queue() # communicate from main session to child fut <- future({ for(i in 1:1000){ Sys.sleep(.1) q$consumer$consume() } }) q$producer$fireEval(stop("Stop that child")) cat(try(value(fut))) # Communicate from child to main session j <- 0 fut <- future({ for(i in 1:10){ Sys.sleep(.2) # set j in the main thread substituting i into the expression q$producer$fireEval(j <- i, env=list(i=i)) } }) while(j < 10){ q$consumer$consume() # collect and execute assignments cat("j = ", j, "\n") Sys.sleep(.1) } fut <- future({ for(i in 1:10){ Sys.sleep(.2) # set j in the main thread substituting i into the expression q$producer$fireEval(print(i), env=list(i=i)) } }) q$consumer$start() # execute `comsume` at regular intervals # clean up q$destroy() ## End(Not run)
Get/set redis configuration
redisConfig(config)redisConfig(config)
config |
a function generating id strings |
Get/set the location for temporary files
redisIdGenerator(generator)redisIdGenerator(generator)
generator |
a function generating id strings |
Reads and writes the queue to a redis db
Reads and writes the queue to a redis db
new()
Creates a redis source object.
RedisSource$new(id = redisIdGenerator()(), config = redisConfig())
idAn identifier to use for the queue
configA configuration list for redux::hiredis
getRedisConnection()
Returns the underlying redis connection.
RedisSource$getRedisConnection()
pop()
removes n items from the source and returns them
RedisSource$pop(n = -1)
nThe number of records to pop (-1 indicates all available).
push()
Adds an item to the source.
RedisSource$push(msg, obj)
msgA string indicating the signal.
objThe object to associate with the signal.
destroy()
Cleans up source after use.
RedisSource$destroy()
finalize()
finalize
RedisSource$finalize()
clone()
The objects of this class are cloneable with this method.
RedisSource$clone(deep = FALSE)
deepWhether to make a deep clone.
A Consumer class with common task handlers useful in Shiny apps
A Consumer class with common task handlers useful in Shiny apps
In addtion to 'eval' and 'function' signals, ShinyConsumer object process 'interrupt' and 'notify' signals for throwing errors and displying Shiny notifictions.
ipc::Consumer -> ShinyConsumer
initHandlers()
Adds default handlers
ShinyConsumer$initHandlers()
clone()
The objects of this class are cloneable with this method.
ShinyConsumer$clone(deep = FALSE)
deepWhether to make a deep clone.
Run Example Shiny Apps
shinyExample(application = c("progress", "changeReactive", "cancel"))shinyExample(application = c("progress", "changeReactive", "cancel"))
application |
The example to run |
'progress' is an example application with a long running analysis that is cancelable and has a progress bar. 'changeReaction' is the old faithful example, but with the histogram colors changing over time. 'cancel' is an example with a cancelable long running process.
A Producer with methods specific for Shiny
A Producer with methods specific for Shiny
A Producer object with additional methods for firing interrupts, shiny notifications, and reactive value assignments.
ipc::Producer -> ShinyProducer
fireInterrupt()
Sends an error with message msg.
ShinyProducer$fireInterrupt(msg = "Interrupt")
msgA string
fireNotify()
Sends a signal to create a shiny Notification with message msg.
ShinyProducer$fireNotify(msg = "Notification")
msgA string
fireAssignReactive()
Signals for assignment for reactive name to value.
ShinyProducer$fireAssignReactive(name, value)
nameThe name of the reactive value.
valueThe value to assign the reactive to.
clone()
The objects of this class are cloneable with this method.
ShinyProducer$clone(deep = FALSE)
deepWhether to make a deep clone.
Create a Queue object
shinyQueue( source = defaultSource()$new(), producer = ShinyProducer$new(source), consumer = ShinyConsumer$new(source), session = shiny::getDefaultReactiveDomain() )shinyQueue( source = defaultSource()$new(), producer = ShinyProducer$new(source), consumer = ShinyConsumer$new(source), session = shiny::getDefaultReactiveDomain() )
source |
The source for reading and writing the queue |
producer |
The producer for the source |
consumer |
The consumer of the source |
session |
A Shiny session |
Creates a Queue object for use with shiny, backed by ShinyTextSource, ShiyProducer and ShinyConsumer objects by default. The object will be cleaned up and destroyed on session end.
Stops a future run in a multicore plan
stopMulticoreFuture(x)stopMulticoreFuture(x)
x |
The MulticoreFuture |
This function sends terminate and kill signals to the process running the future,
and will only work for futures run on a multicore plan. This approach is not
recommended for cases where you can listen for interrupts within the future
(with AsyncInterruptor). However, for cases where long running code is
in an external library for which you don't have control, this can be the only way
to terminate the execution.
Note that multicore is not supported on Windows machines or within RStudio.
Get/set the location for temporary files
tempFileGenerator(tempfile)tempFileGenerator(tempfile)
tempfile |
a function generating working file path (e.g. tempfile()) |
Reads and writes the queue to a text file
Reads and writes the queue to a text file
A wrapper around txtq. This object saves signals
and associated objects to and queue, and retrieves them
for processing.
new()
Creates a TextFileSource
TextFileSource$new(filePath = tempFileGenerator()())
filePathThe path to the file.
pop()
removes n items from the source and returns them
TextFileSource$pop(n = -1)
nThe number of records to pop (-1 indicates all available).
push()
Adds an item to the source.
TextFileSource$push(msg, obj)
msgA string indicating the signal.
objThe object to associate with the signal.
destroy()
Cleans up source after use.
TextFileSource$destroy()
clone()
The objects of this class are cloneable with this method.
TextFileSource$clone(deep = FALSE)
deepWhether to make a deep clone.