Title: | Tools for Message Passing Between Processes |
---|---|
Description: | Provides tools for passing messages between R processes. Shiny examples are provided showing how to perform useful tasks such as: updating reactive values from within a future, progress bars for long running async tasks, and interrupting async tasks based on user input. |
Authors: | Ian E. Fellows |
Maintainer: | Ian E. Fellows <[email protected]> |
License: | MIT + file LICENCE |
Version: | 0.1.5 |
Built: | 2024-12-12 04:57:38 UTC |
Source: | https://github.com/fellstat/ipc |
Provides tools for passing messages between R processes. Shiny Examples are provided showing how to perform useful tasks such as: updating reactive values from within a future, progress bars for long running async tasks, and interrupting async tasks based on user input.
Ian Fellows [email protected]
An interruptor useful for stopping child processes.
An interruptor useful for stopping child processes.
This class is a simple wrapper around a Queue object making adding interrupt checking to future code easy to implement and read.
Methods
initialize(queue=shinyQueue())
Creates a new interruptor.
interrupt(msg="Signaled Interrupt")
Signals an interrupt
execInterrupts()
Executes anything pushed to the queue, including interrupts.
getInterrupts()
Gets the result of the queue's executing, not throwing the interrupts.
new()
Create the object
AsyncInterruptor$new(queue = shinyQueue())
queue
The underlying queue object to use for interruption
interrupt()
signal an error
AsyncInterruptor$interrupt(msg = "Signaled Interrupt")
msg
The error message
execInterrupts()
Execute any interruptions that have been signaled
AsyncInterruptor$execInterrupts()
getInterrupts()
Get any interruptions that have been signaled without throwing them as errors
AsyncInterruptor$getInterrupts()
destroy()
Cleans up object after use
AsyncInterruptor$destroy()
clone()
The objects of this class are cloneable with this method.
AsyncInterruptor$clone(deep = FALSE)
deep
Whether to make a deep clone.
library(future) strategy <- "future::multisession" plan(strategy) inter <- AsyncInterruptor$new() fut <- future({ for(i in 1:100){ Sys.sleep(.01) inter$execInterrupts() } }) inter$interrupt("Error: Stop Future") try(value(fut)) inter$destroy() # Clean up multisession cluster plan(sequential)
library(future) strategy <- "future::multisession" plan(strategy) inter <- AsyncInterruptor$new() fut <- future({ for(i in 1:100){ Sys.sleep(.01) inter$execInterrupts() } }) inter$interrupt("Error: Stop Future") try(value(fut)) inter$destroy() # Clean up multisession cluster plan(sequential)
A progress bar object where inc and set are usable within other processes
A progress bar object where inc and set are usable within other processes
An async compatible wrapper around Shiny's progress bar. It should be instatiated from the main process, but may be closed, set and incremented from any process.
new()
Creates a new progress panel and displays it.
AsyncProgress$new( ..., queue = shinyQueue(), millis = 250, value = NULL, message = NULL, detail = NULL )
...
Additional parameters to be passed to Shiny::Progress
queue
A Queue object for message passing
millis
How often in milliseconds should updates to the progress bar be checked for.
value
A numeric value at which to set
the progress bar, relative to min
and max
.
message
A single-element character vector; the message to be
displayed to the user, or NULL
to hide the current message
(if any).
detail
A single-element character vector; the detail message
to be displayed to the user, or NULL
to hide the current
detail message (if any). The detail message will be shown with a
de-emphasized appearance relative to message
.
getMax()
Returns the maximum
AsyncProgress$getMax()
getMin()
Returns the minimum
AsyncProgress$getMin()
sequentialClose()
Removes the progress panel and destroys the queue. Must be called from main process.
AsyncProgress$sequentialClose()
set()
Updates the progress panel. When called the first time, the progress panel is displayed.
AsyncProgress$set(value = NULL, message = NULL, detail = NULL)
value
A numeric value at which to set
message
A single-element character vector; the message to be
displayed to the user, or NULL
to hide the current message
(if any).
detail
A single-element character vector; the detail message
to be displayed to the user, or NULL
to hide the current
detail message (if any). The detail message will be shown with a
de-emphasized appearance relative to message
.
inc()
Like set
, this updates the progress panel. The difference is
that inc
increases the progress bar by amount
, instead
of setting it to a specific value.
AsyncProgress$inc(amount = 0.1, message = NULL, detail = NULL)
amount
the size of the increment.
message
A single-element character vector; the message to be
displayed to the user, or NULL
to hide the current message
(if any).
detail
A single-element character vector; the detail message
to be displayed to the user, or NULL
to hide the current
detail message (if any). The detail message will be shown with a
de-emphasized appearance relative to message
.
close()
Fires a close signal and may be used from any process.
AsyncProgress$close()
clone()
The objects of this class are cloneable with this method.
AsyncProgress$clone(deep = FALSE)
deep
Whether to make a deep clone.
## Only run examples in interactive R sessions if (interactive()) { library(shiny) library(future) plan(multisession) ui <- fluidPage( actionButton("run","Run"), tableOutput("dataset") ) server <- function(input, output, session) { dat <- reactiveVal() observeEvent(input$run, { progress <- AsyncProgress$new(session, min=1, max=15) future({ for (i in 1:15) { progress$set(value = i) Sys.sleep(0.5) } progress$close() cars }) %...>% dat NULL #return something other than the future so the UI is not blocked }) output$dataset <- renderTable({ req(dat()) }) } shinyApp(ui, server) }
## Only run examples in interactive R sessions if (interactive()) { library(shiny) library(future) plan(multisession) ui <- fluidPage( actionButton("run","Run"), tableOutput("dataset") ) server <- function(input, output, session) { dat <- reactiveVal() observeEvent(input$run, { progress <- AsyncProgress$new(session, min=1, max=15) future({ for (i in 1:15) { progress$set(value = i) Sys.sleep(0.5) } progress$close() cars }) %...>% dat NULL #return something other than the future so the UI is not blocked }) output$dataset <- renderTable({ req(dat()) }) } shinyApp(ui, server) }
A Class for reading and executing tasks from a source
A Class for reading and executing tasks from a source
handlers
A list of handlers
stopped
Is currently stopped.
laterHandle
A callback handle.
new()
Creates the object.
Consumer$new(source)
source
A source, e.g. TextFileSource.
setSource()
Sets the source.
Consumer$setSource(source)
source
A source, e.g. TextFileSource.
getSource()
Gets the source.
Consumer$getSource()
consume()
Executes all (unprocessed) signals fired to source from a Producer.
if throwErrors
is TRUE, the first error encountered is thrown
after executing all signals. Signals are executed in the env
environment.
If env
is NULL, the environment set at initialization is used.
Consumer$consume(throwErrors = TRUE, env = parent.frame())
throwErrors
Should errors be thrown or caught.
env
The execution environment.
start()
Starts executing consume
every millis
milliseconds. throwErrors
and env
are passed down to consume
Consumer$start(millis = 250, env = parent.frame())
millis
milliseconds.
env
The execution environment.
stop()
Stops the periodic execution of consume
.
Consumer$stop()
addHandler()
Adds a handler for 'signal'. func
Consumer$addHandler(func, signal)
func
The function which takes three parameters: 1. the signal, 2. the message object, and 3. the evaluation environment.
signal
A string to bind the function to.
clearHandlers()
Removes all handler.s
Consumer$clearHandlers()
removeHandler()
Removes a single handler.
Consumer$removeHandler(signal, index)
signal
The signal of the handler.
index
The index of the handler to remove from the signal.
initHandlers()
Adds default handlers.
Consumer$initHandlers()
finalize()
cleans up object.
Consumer$finalize()
clone()
The objects of this class are cloneable with this method.
Consumer$clone(deep = FALSE)
deep
Whether to make a deep clone.
Get/set the class used to sink/read from the file system
defaultSource(sourceClass)
defaultSource(sourceClass)
sourceClass |
An R6 object |
A Class for sending signals to a source
A Class for sending signals to a source
new()
Creates a Producer object linked to the source
.
Producer$new(source)
source
A source.
setSource()
Setter for source.
Producer$setSource(source)
source
A source.
getSource()
Getter for source.
Producer$getSource()
fire()
Sends a signal to the source with associates object obj
.
Producer$fire(signal, obj = NA)
signal
A string signal to send.
obj
The object to associate with the signal.
fireEval()
Signals for execution of the expression obj
with values from
the environment (or list) env
substituted in.
Producer$fireEval(expr, env)
expr
An expression to evaluate.
env
An environment or list for substitution
fireDoCall()
Signals for execution of the function whose string value is name
with the parameters in list param
.
Producer$fireDoCall(name, param)
name
the name of the function
param
A list of function parameters.
fireCall()
Signals for execution of the function whose string value is name
with the parameters ...
.
Producer$fireCall(name, ...)
name
the name of the function
...
The arguments to the function.
clone()
The objects of this class are cloneable with this method.
Producer$clone(deep = FALSE)
deep
Whether to make a deep clone.
Creates a Queue object for inter-process communication.
Its members producer
and consumer
are the main entry points for
sending and receiving messages respectively.
queue( source = defaultSource()$new(), producer = Producer$new(source), consumer = Consumer$new(source) )
queue( source = defaultSource()$new(), producer = Producer$new(source), consumer = Consumer$new(source) )
source |
The source for reading and writing the queue |
producer |
The producer for the source |
consumer |
The consumer of the source |
This function creates a queue object for communication between different R processes,
including forks of the same process. By default, it uses txtq
backage as its backend.
Technically, the information is sent through temporary files, created in a new directory
inside the session-specific temporary folder (see tempfile
).
This requires that the new directory is writeable, this is normally the case but
if Sys.umask
forbids writing, the communication fails with an error.
producer
A Producer object
consumer
a Consumer object.
new()
Create a Queue object
Queue$new(source, prod, cons)
source
The source to use for communication.
prod
A Producer object.
cons
A Consumer object.
destroy()
clean up object after use.
Queue$destroy()
clone()
The objects of this class are cloneable with this method.
Queue$clone(deep = FALSE)
deep
Whether to make a deep clone.
## Not run: library(parallel) library(future) library(promises) plan(multisession) q <- queue() # communicate from main session to child fut <- future({ for(i in 1:1000){ Sys.sleep(.1) q$consumer$consume() } }) q$producer$fireEval(stop("Stop that child")) cat(try(value(fut))) # Communicate from child to main session j <- 0 fut <- future({ for(i in 1:10){ Sys.sleep(.2) # set j in the main thread substituting i into the expression q$producer$fireEval(j <- i, env=list(i=i)) } }) while(j < 10){ q$consumer$consume() # collect and execute assignments cat("j = ", j, "\n") Sys.sleep(.1) } fut <- future({ for(i in 1:10){ Sys.sleep(.2) # set j in the main thread substituting i into the expression q$producer$fireEval(print(i), env=list(i=i)) } }) q$consumer$start() # execute `comsume` at regular intervals # clean up q$destroy() ## End(Not run)
## Not run: library(parallel) library(future) library(promises) plan(multisession) q <- queue() # communicate from main session to child fut <- future({ for(i in 1:1000){ Sys.sleep(.1) q$consumer$consume() } }) q$producer$fireEval(stop("Stop that child")) cat(try(value(fut))) # Communicate from child to main session j <- 0 fut <- future({ for(i in 1:10){ Sys.sleep(.2) # set j in the main thread substituting i into the expression q$producer$fireEval(j <- i, env=list(i=i)) } }) while(j < 10){ q$consumer$consume() # collect and execute assignments cat("j = ", j, "\n") Sys.sleep(.1) } fut <- future({ for(i in 1:10){ Sys.sleep(.2) # set j in the main thread substituting i into the expression q$producer$fireEval(print(i), env=list(i=i)) } }) q$consumer$start() # execute `comsume` at regular intervals # clean up q$destroy() ## End(Not run)
Get/set redis configuration
redisConfig(config)
redisConfig(config)
config |
a function generating id strings |
Get/set the location for temporary files
redisIdGenerator(generator)
redisIdGenerator(generator)
generator |
a function generating id strings |
Reads and writes the queue to a redis db
Reads and writes the queue to a redis db
new()
Creates a redis source object.
RedisSource$new(id = redisIdGenerator()(), config = redisConfig())
id
An identifier to use for the queue
config
A configuration list for redux::hiredis
getRedisConnection()
Returns the underlying redis connection.
RedisSource$getRedisConnection()
pop()
removes n items from the source and returns them
RedisSource$pop(n = -1)
n
The number of records to pop (-1 indicates all available).
push()
Adds an item to the source.
RedisSource$push(msg, obj)
msg
A string indicating the signal.
obj
The object to associate with the signal.
destroy()
Cleans up source after use.
RedisSource$destroy()
finalize()
finalize
RedisSource$finalize()
clone()
The objects of this class are cloneable with this method.
RedisSource$clone(deep = FALSE)
deep
Whether to make a deep clone.
A Consumer class with common task handlers useful in Shiny apps
A Consumer class with common task handlers useful in Shiny apps
In addtion to 'eval' and 'function' signals, ShinyConsumer object process 'interrupt' and 'notify' signals for throwing errors and displying Shiny notifictions.
ipc::Consumer
-> ShinyConsumer
initHandlers()
Adds default handlers
ShinyConsumer$initHandlers()
clone()
The objects of this class are cloneable with this method.
ShinyConsumer$clone(deep = FALSE)
deep
Whether to make a deep clone.
Run Example Shiny Apps
shinyExample(application = c("progress", "changeReactive", "cancel"))
shinyExample(application = c("progress", "changeReactive", "cancel"))
application |
The example to run |
'progress' is an example application with a long running analysis that is cancelable and has a progress bar. 'changeReaction' is the old faithful example, but with the histogram colors changing over time. 'cancel' is an example with a cancelable long running process.
A Producer with methods specific for Shiny
A Producer with methods specific for Shiny
A Producer object with additional methods for firing interrupts, shiny notifications, and reactive value assignments.
ipc::Producer
-> ShinyProducer
fireInterrupt()
Sends an error with message msg
.
ShinyProducer$fireInterrupt(msg = "Interrupt")
msg
A string
fireNotify()
Sends a signal to create a shiny Notification with message msg
.
ShinyProducer$fireNotify(msg = "Notification")
msg
A string
fireAssignReactive()
Signals for assignment for reactive name
to value
.
ShinyProducer$fireAssignReactive(name, value)
name
The name of the reactive value.
value
The value to assign the reactive to.
clone()
The objects of this class are cloneable with this method.
ShinyProducer$clone(deep = FALSE)
deep
Whether to make a deep clone.
Create a Queue object
shinyQueue( source = defaultSource()$new(), producer = ShinyProducer$new(source), consumer = ShinyConsumer$new(source), session = shiny::getDefaultReactiveDomain() )
shinyQueue( source = defaultSource()$new(), producer = ShinyProducer$new(source), consumer = ShinyConsumer$new(source), session = shiny::getDefaultReactiveDomain() )
source |
The source for reading and writing the queue |
producer |
The producer for the source |
consumer |
The consumer of the source |
session |
A Shiny session |
Creates a Queue object for use with shiny, backed by ShinyTextSource, ShiyProducer and ShinyConsumer objects by default. The object will be cleaned up and destroyed on session end.
Stops a future run in a multicore plan
stopMulticoreFuture(x)
stopMulticoreFuture(x)
x |
The MulticoreFuture |
This function sends terminate and kill signals to the process running the future,
and will only work for futures run on a multicore plan. This approach is not
recommended for cases where you can listen for interrupts within the future
(with AsyncInterruptor
). However, for cases where long running code is
in an external library for which you don't have control, this can be the only way
to terminate the execution.
Note that multicore is not supported on Windows machines or within RStudio.
Get/set the location for temporary files
tempFileGenerator(tempfile)
tempFileGenerator(tempfile)
tempfile |
a function generating working file path (e.g. tempfile()) |
Reads and writes the queue to a text file
Reads and writes the queue to a text file
A wrapper around txtq
. This object saves signals
and associated objects to and queue, and retrieves them
for processing.
new()
Creates a TextFileSource
TextFileSource$new(filePath = tempFileGenerator()())
filePath
The path to the file.
pop()
removes n items from the source and returns them
TextFileSource$pop(n = -1)
n
The number of records to pop (-1 indicates all available).
push()
Adds an item to the source.
TextFileSource$push(msg, obj)
msg
A string indicating the signal.
obj
The object to associate with the signal.
destroy()
Cleans up source after use.
TextFileSource$destroy()
clone()
The objects of this class are cloneable with this method.
TextFileSource$clone(deep = FALSE)
deep
Whether to make a deep clone.