Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
I’ve been drafting a new R package — sergeant
— to work with Apache Drill and have found it much easier to manage having Drill operating in a single node cluster vs drill-embedded
mode (esp when I need to add a couple more nodes for additional capacity). That means running Apache Zookeeper and I’ve had the occasional need to ping the Zookeeper process to see various stats (especially when I add some systems to the cluster).
Yes, it’s very easy+straightforward to nc hostname 2181
from the command line to issue commands (or use the Zookeeper CLIs) but when I’m in R I like to stay in R. To that end, I made a small reference class that makes it super-easy to query Zookeeper from within R. Said class will eventually be in a sister package to sergeant
(and, a more detailed post on sergeant
is forthcoming), but there may be others who want/need to poke at Zookeeper processes with 4-letter words and this will help you stay within R. The only command not available is stmk
since I don’t have the pressing need to set the trace mask.
After you source the self-contained reference class you connect and then issue commands as so:
zk <- zookeeper$new(host="drill.local") zk$ruok() zk$conf() zk$stat()
Drop a note in the comments (since this isn’t on github, yet) with any questions/issues.
zookeeper <- setRefClass( Class="zookeeper", fields=list( host="character", port="integer", timeout="integer" ), methods=list( initialize=function(..., host="localhost", port=2181L, timeout=30L) { host <<- host ; port <<- port ; timeout <<- timeout; callSuper(...) }, available_commands=function() { return(sort(c("conf", "envi", "stat", "srvr", "whcs", "wchc", "wchp", "mntr", "cons", "crst", "srst", "dump", "ruok", "dirs", "isro", "gtmk"))) }, connect=function(host="localhost", port=2181L, timeout=30L) { .self$host <- host ; .self$port <- port ; .self$timeout <- timeout }, conf=function() { res <- .self$send_cmd("conf") res <- stringi::stri_split_fixed(res, "=", 2, simplify=TRUE) as.list(setNames(res[,2], res[,1])) }, envi=function() { res <- .self$send_cmd("envi") res <- stringi::stri_split_fixed(res[-1], "=", 2, simplify=TRUE) as.list(setNames(res[,2], res[,1])) }, stat=function() { res <- .self$send_cmd("stat") version <- stri_replace_first_regex(res[1], "^Zoo.*: ", "") res <- res[-(1:2)] sep <- which(res=="") clients <- stri_trim(res[1:(sep-1)]) zstats <- stringi::stri_split_fixed(res[(sep+1):length(res)], ": ", 2, simplify=TRUE) zstats <- as.list(setNames(zstats[,2], zstats[,1])) list(version=version, clients=clients, stats=zstats) }, srvr=function() { res <- .self$send_cmd("srvr") zstats <- stringi::stri_split_fixed(res, ": ", 2, simplify=TRUE) as.list(setNames(zstats[,2], zstats[,1])) }, wchs=function() { res <- .self$send_cmd("wchs") conn_path <- stri_match_first_regex(res[1], "^([[:digit:]]+) connections watching ([[:digit:]]+) paths") tot_watch <- stri_match_first_regex(res[2], "Total watches:([[:digit:]]+)") list(connections_watching=conn_path[,2], paths=conn_path[,3], total_watches=tot_watch[,2]) }, wchc=function() { stri_trim(.self$send_cmd("wchc")) %>% discard(`==`, "") -> res setNames(list(res[2:length(res)]), res[1]) }, wchp=function() { .self$send_cmd("wchp") %>% stri_trim() %>% discard(`==`, "") -> res data.frame( path=qq[seq(1, length(qq), 2)], address=qq[seq(2, length(qq), 2)], stringsAsFactors=FALSE ) }, mntr=function() { res <- .self$send_cmd("mntr") res <- stringi::stri_split_fixed(res, "\t", 2, simplify=TRUE) as.list(setNames(res[,2], res[,1])) }, cons=function() { list(clients=stri_trim(.self$send_cmd("cons") %>% discard(`==`, ""))) }, crst=function() { message(.self$send_cmd("crst")) ; invisible() }, srst=function() { message(.self$send_cmd("srst")) ; invisible() }, dump=function() { paste0(.self$send_cmd("dump"), collapse="\n") }, ruok=function() { .self$send_cmd("ruok") == "imok" }, dirs=function() { .self$send_cmd("dirs") }, isro=function() { .self$send_cmd("isro") }, gtmk=function() { R.utils::intToBin(as.integer(.self$send_cmd("gtmk"))) }, send_cmd=function(cmd) { require(purrr) require(stringi) require(R.utils) sock <- purrr::safely(socketConnection) con <- sock(host=.self$host, port=.self$port, blocking=TRUE, open="r+", timeout=.self$timeout) if (!is.null(con$result)) { con <- con$result cat(cmd, file=con) response <- readLines(con, warn=FALSE) a <- try(close(con)) purrr::flatten_chr(stringi::stri_split_lines(response)) } else { warning(sprintf("Error connecting to [%s:%s]", host, port)) } } ) )
R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.