Site icon R-bloggers

nanonext – how it provides a concurrency framework for R

[This article was first published on shikokuchuo{net}, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
< aside> Shikokuchuo

The nanonext package, featured in RStudio’s Top 40 New CRAN Packages for January 2022 has been steadily evolving, adding significant new features, with the aysnc ‘Aio’ interface now considered complete since release 0.3.0 hit CRAN earlier in March.

Hence, time to introduce why this is a ‘concurrency framework’ and not ‘just’ messaging.

nanonext is a lightweight binding for the NNG (nanomsg next gen) C library, written in a combination of R and C with no package dependencies. For the experts who need no further introduction, they may wish to skip straight to the pkgdown site which contains a more systematic exposition of the features: https://shikokuchuo.net/nanonext/.

Aios

These are self-resolving objects containing the results of an async operation.

The purpose of this section is really to highlight that this is true async – the real thing. No event loops, nor any other similar constraints. This provides the freedom to be much more expressive when coding. Below, we perform actions out of order – receive before we send – and it is all totally fine.

# loading the package and creating sockets
library(nanonext)
s1 <- socket("pair", listen = "inproc://nano")
s2 <- socket("pair", dial = "inproc://nano")

# an async receive is requested, but no messages are waiting (yet to be sent)
msg <- s2 |> recv_aio()
msg
< recvAio >
 - $data for message data
 - $raw for raw message
msg$data
'unresolved' logi NA

send_aio() and recv_aio() functions return immediately with an ‘Aio’ object, but perform their operations async. An ‘Aio’ object returns an ‘unresolved’ logical NA value whilst its asynchronous operation is ongoing. This is an actual NA value, and Shiny will, for example, recognise it as being ‘non-truthy’.

Next we perform a send, and the ‘Aio’ resolves immediately once we do that. 1

res <- s1 |> send_aio(data.frame(a = 1, b = 2))

# now that a message has been sent, the 'recvAio' automatically resolves
msg$data
  a b
1 1 2
msg$raw
  [1] 58 0a 00 00 00 03 00 04 01 03 00 03 05 00 00 00 00 05 55 54 46
 [22] 2d 38 00 00 03 13 00 00 00 02 00 00 00 0e 00 00 00 01 3f f0 00
 [43] 00 00 00 00 00 00 00 00 0e 00 00 00 01 40 00 00 00 00 00 00 00
 [64] 00 00 04 02 00 00 00 01 00 04 00 09 00 00 00 05 6e 61 6d 65 73
 [85] 00 00 00 10 00 00 00 02 00 04 00 09 00 00 00 01 61 00 04 00 09
[106] 00 00 00 01 62 00 00 04 02 00 00 00 01 00 04 00 09 00 00 00 05
[127] 63 6c 61 73 73 00 00 00 10 00 00 00 01 00 04 00 09 00 00 00 0a
[148] 64 61 74 61 2e 66 72 61 6d 65 00 00 04 02 00 00 00 01 00 04 00
[169] 09 00 00 00 09 72 6f 77 2e 6e 61 6d 65 73 00 00 00 0d 00 00 00
[190] 02 80 00 00 00 ff ff ff ff 00 00 00 fe

So isn’t this still ‘just’ messaging?

Well, we can start with introducing a little helper function unresolved(). This allows us to perform actions which depend on resolution of the Aio (completion of the async operation), both before and after. This means there is no need to ever wait (block) for an Aio to resolve, as the below demonstrates:

msg <- recv_aio(s2)

# unresolved() queries for resolution itself so no need to use it again within the while loop
while (unresolved(msg)) {
  # do real stuff here not just the toy actions below
  cat("unresolved")
  send_aio(s1, "resolved")
  Sys.sleep(0.1)  
}
unresolved
# resolution of the Aio exits the while loop - now do the stuff which depends on its value
msg$data
[1] "resolved"

Alternatively, an Aio may also be called explicitly by wrapping it in call_aio(). This will wait for completion of the Aio (blocking) if it is yet to resolve.

# to access the resolved value directly (waiting if required)
call_aio(msg)$data
[1] "resolved"

The above two methods provide full flexibility for handling async operations as desired.

RPC

So we move closer to explaining how this is a ‘concurrency framework’. And this involves explaining a little about NNG’s ‘scalability protocols’ – so-called as they are designed to be masssively scalable.

These can be thought of as communications patterns built on top of raw bytestream connections. So a socket of a certain type will always interact with another in a prescribed way. No matter the platform, and no matter the language binding.

Probably the most classic pattern for NNG is the req/rep (request/reply). This is a guaranteed communications pattern that will not drop messages, retrying under the hood if messages cannot be delivered for whatever reason. This can be utilised to implement ‘traditional’ RPC (remote prodecure calls), a bastion of systems/distributed computing. 2

This is where a requestor (client) sends a message to an executor (server), which performs the requested action and sends back a reply. {nanonext} provides the convenience functions request() and reply() which implements this logic for use between 2 R processes, where the requestor supplies data to the reply node, to which it applies an arbitrary function before sending back the return value.

This can be meaningfully used to perform computationally-expensive calculations or I/O-bound operations such as writing large amounts of data to disk in a separate ‘server’ process running concurrently.

Server process: reply() will wait for a message and apply a function, in this case rnorm(), before sending back the result.

# This code block is run in a separate R process to knit this document

library(nanonext)
rep <- socket("rep", listen = "tcp://127.0.0.1:6546")
ctxp <- context(rep)
reply(ctxp, execute = rnorm, send_mode = "raw") 

Client process: request() performs an async send and receive request and returns immediately with an Aio object.

library(nanonext)
req <- socket("req", dial = "tcp://127.0.0.1:6546")
ctxq <- context(req)
aio <- request(ctxq, data = 1e8, recv_mode = "double", keep.raw = FALSE)

At this point, the client can run additional code concurrent with the server processing the request. The Aio will then resolve automatically or can be called as required.

call_aio(aio)$data |> str()
 num [1:100000000] -0.0523 0.509 1.9997 0.2791 -0.0287 ...

And this is how nanonext provides a true concurrency framework. The package provides the necessary tools to implement anything from a walkie-talkie to distributed computing clusters and everything in between.

mirai

A small (tiny) package has also been released to CRAN in February 2022 that exposes the functionality of executing arbitrary R expressions asynchronously for use on a single machine. It is called ‘mirai’, the Japanese for ‘future’. Everything revolves around one single function. It is very minimalistic. Designed to be intuitive to use, a short intro can be found here: https://shikokuchuo.net/mirai/.


  1. Or more precisely, the Aio will resolve the next time it is queried – but practically this is the same thing, as the value cannot be used unless it is queried. This is akin to ‘Schrödinger’s Cat’ – if we never look into the box, we simply don’t know the state, but as soon as we look, we will get a resolution one way or another. Here, if the value is never used, it could remain in a state of ‘superposition’ but as soon as it is required (even if we are only seeking metadata such as its length rather than the actual value), it will resolve either to an ‘unresolved’ NA or its actual value.↩︎

  2. Although the generic term includes ‘remote’, obviously everything can also happen on the same machine in separate processes.↩︎

To leave a comment for the author, please follow the link and comment on their blog: shikokuchuo{net}.

R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.