7 Distributed R
7.1 Basic distribution
Use spark_apply() to to view the partition row size
- Cache a sample of fligths
flights_sample <- spark_flights %>%
sample_frac(0.01) %>%
mutate(arrdelay = as.numeric(arrdelay)) %>%
ft_binarizer(
input.col = "arrdelay",
output.col = "delayed",
threshold = 15
) %>%
compute("flights_sample")
Navigate to the Storage page in the Spark UI
Pass
nrow
tospark_apply()
to get the row count by partition
spark_apply(flights_sample, nrow)
## # Source: table<sparklyr_tmp_488916ccc950> [?? x 1]
## # Database: spark_connection
## flightid
## <int>
## 1 6195
## 2 6137
## 3 6183
## 4 6116
## 5 12192
## 6 11641
## 7 10956
## 8 10686
- Pass a function to operate the average distance in each partition
spark_apply(
flights_sample,
function(x) mean(as.numeric(x$distance))
)
## # Source: table<sparklyr_tmp_4889594205b9> [?? x 1]
## # Database: spark_connection
## flightid
## <dbl>
## 1 580
## 2 567
## 3 569
## 4 571
## 5 627
## 6 623
## 7 589
## 8 608
7.2 Use group_by
Pass a grouping field to be used instead of partitions
- Use the
group_by
argument to partition by the month field
spark_apply(flights_sample, nrow, group_by = "month", columns = "count")
## # Source: table<sparklyr_tmp_48893fe1865> [?? x 2]
## # Database: spark_connection
## month count
## <chr> <int>
## 1 8 6183
## 2 11 5325
## 3 9 5361
## 4 12 5416
## 5 1 6140
## 6 2 5734
## 7 3 6137
## 8 4 5907
## 9 5 6052
## 10 6 6116
## # ... with more rows
- Pass the same function from the previous exercise to calculate the average distance by month
spark_apply(
flights_sample,
function(x) mean(as.numeric(x$distance)),
group_by = "month",
columns = "avg_distance"
)
## # Source: table<sparklyr_tmp_4889bd20ea1> [?? x 2]
## # Database: spark_connection
## month avg_distance
## <chr> <dbl>
## 1 8 569
## 2 11 520
## 3 9 543
## 4 12 533
## 5 1 549
## 6 2 546
## 7 3 567
## 8 4 557
## 9 5 556
## 10 6 571
## # ... with more rows
7.3 Distributing packages
Use non-base-R packages to run the code in Spark
- Use
broom::tidy()
to run oneglm()
model per month
models <- spark_apply(
flights_sample,
function(e) broom::tidy(glm(delayed ~ arrdelay, data = e, family = "binomial")),
names = c("term", "estimate", "std_error", "statistic", "p_value"),
group_by = "month"
)
models
## # Source: table<sparklyr_tmp_4889752e7898> [?? x 6]
## # Database: spark_connection
## month term estimate std_error statistic p_value
## <chr> <chr> <dbl> <dbl> <dbl> <dbl>
## 1 8 (Intercept) -490 5314 -0.0923 0.926
## 2 8 arrdelay 31.6 343 0.0923 0.926
## 3 11 (Intercept) -492 6284 -0.0783 0.938
## 4 11 arrdelay 31.7 406 0.0782 0.938
## 5 9 (Intercept) -472 6154 -0.0767 0.939
## 6 9 arrdelay 30.5 397 0.0767 0.939
## 7 12 (Intercept) -490 4964 -0.0986 0.921
## 8 12 arrdelay 31.6 320 0.0987 0.921
## 9 1 (Intercept) -501 5437 -0.0922 0.927
## 10 1 arrdelay 32.3 351 0.0922 0.927
## # ... with more rows
- Close Spark connection
spark_disconnect(sc)