7 Distributed R

7.1 Basic distribution

Use spark_apply() to to view the partition row size

  1. Cache a sample of fligths
flights_sample <- spark_flights %>%
  sample_frac(0.01) %>%
  mutate(arrdelay = as.numeric(arrdelay)) %>%
  ft_binarizer(
    input.col = "arrdelay",
    output.col = "delayed",
    threshold = 15
  ) %>%
  compute("flights_sample")
  1. Navigate to the Storage page in the Spark UI

  2. Pass nrow to spark_apply() to get the row count by partition

spark_apply(flights_sample, nrow)
## # Source:   table<sparklyr_tmp_488916ccc950> [?? x 1]
## # Database: spark_connection
##   flightid
##      <int>
## 1     6195
## 2     6137
## 3     6183
## 4     6116
## 5    12192
## 6    11641
## 7    10956
## 8    10686
  1. Pass a function to operate the average distance in each partition
spark_apply(
  flights_sample, 
  function(x) mean(as.numeric(x$distance))
  )
## # Source:   table<sparklyr_tmp_4889594205b9> [?? x 1]
## # Database: spark_connection
##   flightid
##      <dbl>
## 1      580
## 2      567
## 3      569
## 4      571
## 5      627
## 6      623
## 7      589
## 8      608

7.2 Use group_by

Pass a grouping field to be used instead of partitions

  1. Use the group_by argument to partition by the month field
spark_apply(flights_sample, nrow, group_by = "month", columns = "count")
## # Source:   table<sparklyr_tmp_48893fe1865> [?? x 2]
## # Database: spark_connection
##    month count
##    <chr> <int>
##  1 8      6183
##  2 11     5325
##  3 9      5361
##  4 12     5416
##  5 1      6140
##  6 2      5734
##  7 3      6137
##  8 4      5907
##  9 5      6052
## 10 6      6116
## # ... with more rows
  1. Pass the same function from the previous exercise to calculate the average distance by month
spark_apply(
  flights_sample,
  function(x) mean(as.numeric(x$distance)),
  group_by = "month",
  columns = "avg_distance"
)
## # Source:   table<sparklyr_tmp_4889bd20ea1> [?? x 2]
## # Database: spark_connection
##    month avg_distance
##    <chr>        <dbl>
##  1 8              569
##  2 11             520
##  3 9              543
##  4 12             533
##  5 1              549
##  6 2              546
##  7 3              567
##  8 4              557
##  9 5              556
## 10 6              571
## # ... with more rows

7.3 Distributing packages

Use non-base-R packages to run the code in Spark

  1. Use broom::tidy() to run one glm() model per month
models <- spark_apply(
  flights_sample,
  function(e) broom::tidy(glm(delayed ~ arrdelay, data = e, family = "binomial")),
  names = c("term", "estimate", "std_error", "statistic", "p_value"),
  group_by = "month"
)

models
## # Source:   table<sparklyr_tmp_4889752e7898> [?? x 6]
## # Database: spark_connection
##    month term        estimate std_error statistic p_value
##    <chr> <chr>          <dbl>     <dbl>     <dbl>   <dbl>
##  1 8     (Intercept)   -490        5314   -0.0923   0.926
##  2 8     arrdelay        31.6       343    0.0923   0.926
##  3 11    (Intercept)   -492        6284   -0.0783   0.938
##  4 11    arrdelay        31.7       406    0.0782   0.938
##  5 9     (Intercept)   -472        6154   -0.0767   0.939
##  6 9     arrdelay        30.5       397    0.0767   0.939
##  7 12    (Intercept)   -490        4964   -0.0986   0.921
##  8 12    arrdelay        31.6       320    0.0987   0.921
##  9 1     (Intercept)   -501        5437   -0.0922   0.927
## 10 1     arrdelay        32.3       351    0.0922   0.927
## # ... with more rows
  1. Close Spark connection
spark_disconnect(sc)