8 Spark pipelines

8.1 Recreate the transformations

Overview of how most of the existing code will be reused

  1. Register a new table called current containing a sample of the base flights table
model_data <- sdf_partition(
  tbl(sc, "flights"),
  training = 0.01,
  testing = 0.01,
  rest = 0.98
)
  1. Recreate the dplyr code in the cached_flights variable from the previous unit
pipeline_df <- model_data$training %>%
  mutate(
    arrdelay = ifelse(arrdelay == "NA", 0, arrdelay),
    depdelay = ifelse(depdelay == "NA", 0, depdelay)
  ) %>%
  select(
    month,
    dayofmonth,
    arrtime,
    arrdelay,
    depdelay,
    crsarrtime,
    crsdeptime,
    distance
  ) %>%
  mutate_all(as.numeric)
  1. Create a new Spark pipeline
flights_pipeline <- ml_pipeline(sc) %>%
  ft_dplyr_transformer(
    tbl = pipeline_df
  ) %>%
  ft_binarizer(
    input_col = "arrdelay",
    output_col = "delayed",
    threshold = 15
  ) %>%
  ft_bucketizer(
    input_col = "crsdeptime",
    output_col = "dephour",
    splits = c(0, 400, 800, 1200, 1600, 2000, 2400)
  ) %>%
  ft_r_formula(delayed ~ arrdelay + dephour) %>%
  ml_logistic_regression()

flights_pipeline
## Pipeline (Estimator) with 5 stages
## <pipeline_48894091d631> 
##   Stages 
##   |--1 SQLTransformer (Transformer)
##   |    <dplyr_transformer_4889709b1de9> 
##   |     (Parameters -- Column Names)
##   |--2 Binarizer (Transformer)
##   |    <binarizer_488915dc3fc5> 
##   |     (Parameters -- Column Names)
##   |      input_col: arrdelay
##   |      output_col: delayed
##   |--3 Bucketizer (Transformer)
##   |    <bucketizer_48892eb462c1> 
##   |     (Parameters -- Column Names)
##   |      input_col: crsdeptime
##   |      output_col: dephour
##   |--4 RFormula (Estimator)
##   |    <r_formula_488934917e7e> 
##   |     (Parameters -- Column Names)
##   |      features_col: features
##   |      label_col: label
##   |     (Parameters)
##   |      formula: delayed ~ arrdelay + dephour
##   |--5 LogisticRegression (Estimator)
##   |    <logistic_regression_4889771b69ab> 
##   |     (Parameters -- Column Names)
##   |      features_col: features
##   |      label_col: label
##   |      prediction_col: prediction
##   |      probability_col: probability
##   |      raw_prediction_col: rawPrediction
##   |     (Parameters)
##   |      elastic_net_param: 0
##   |      fit_intercept: TRUE
##   |      max_iter: 100
##   |      reg_param: 0
##   |      standardization: TRUE
##   |      threshold: 0.5
##   |      tol: 1e-06

8.2 Fit, evaluate, save

  1. Fit (train) the pipeline’s model
model <- ml_fit(flights_pipeline, model_data$training)
model
## PipelineModel (Transformer) with 5 stages
## <pipeline_48894091d631> 
##   Stages 
##   |--1 SQLTransformer (Transformer)
##   |    <dplyr_transformer_4889709b1de9> 
##   |     (Parameters -- Column Names)
##   |--2 Binarizer (Transformer)
##   |    <binarizer_488915dc3fc5> 
##   |     (Parameters -- Column Names)
##   |      input_col: arrdelay
##   |      output_col: delayed
##   |--3 Bucketizer (Transformer)
##   |    <bucketizer_48892eb462c1> 
##   |     (Parameters -- Column Names)
##   |      input_col: crsdeptime
##   |      output_col: dephour
##   |--4 RFormulaModel (Transformer)
##   |    <r_formula_488934917e7e> 
##   |     (Parameters -- Column Names)
##   |      features_col: features
##   |      label_col: label
##   |     (Transformer Info)
##   |      formula:  chr "delayed ~ arrdelay + dephour" 
##   |--5 LogisticRegressionModel (Transformer)
##   |    <logistic_regression_4889771b69ab> 
##   |     (Parameters -- Column Names)
##   |      features_col: features
##   |      label_col: label
##   |      prediction_col: prediction
##   |      probability_col: probability
##   |      raw_prediction_col: rawPrediction
##   |     (Transformer Info)
##   |      coefficients:  num [1:2] 26.326 -0.158 
##   |      intercept:  num -408 
##   |      num_classes:  int 2 
##   |      num_features:  int 2 
##   |      threshold:  num 0.5
  1. Use the newly fitted model to perform predictions using ml_transform()
predictions <- ml_transform(
  x = model,
  dataset = model_data$testing
)
  1. Use group_by() to see how the model performed
predictions %>%
  group_by(delayed, prediction) %>%
  tally()
## # Source:   lazy query [?? x 3]
## # Database: spark_connection
## # Groups:   delayed
##   delayed prediction     n
##     <dbl>      <dbl> <dbl>
## 1    0          0    55403
## 2    1.00       1.00 14717
  1. Save the model into disk using ml_save()
ml_save(model, "saved_model", overwrite = TRUE)
## NULL
list.files("saved_model")
## [1] "metadata" "stages"
  1. Save the pipeline using ml_save()
ml_save(flights_pipeline, "saved_pipeline", overwrite = TRUE)
## NULL
list.files("saved_pipeline")
## [1] "metadata" "stages"
  1. Close the Spark session
spark_disconnect(sc)

8.3 Reload model

Use the saved model inside a different Spark session

  1. Open a new Spark connection and reload the data
library(sparklyr)
sc <- spark_connect(master = "local", version = "2.0.0")
spark_flights <- spark_read_csv(
  sc,
  name = "flights",
  path = "/usr/share/flights/data/",
  memory = FALSE,
  columns = file_columns,
  infer_schema = FALSE
)
  1. Use ml_load() to reload the model directly into the Spark session
reload <- ml_load(sc, "saved_model")
reload
## PipelineModel (Transformer) with 5 stages
## <pipeline_48894091d631> 
##   Stages 
##   |--1 SQLTransformer (Transformer)
##   |    <dplyr_transformer_4889709b1de9> 
##   |     (Parameters -- Column Names)
##   |--2 Binarizer (Transformer)
##   |    <binarizer_488915dc3fc5> 
##   |     (Parameters -- Column Names)
##   |      input_col: arrdelay
##   |      output_col: delayed
##   |--3 Bucketizer (Transformer)
##   |    <bucketizer_48892eb462c1> 
##   |     (Parameters -- Column Names)
##   |      input_col: crsdeptime
##   |      output_col: dephour
##   |--4 RFormulaModel (Transformer)
##   |    <r_formula_488934917e7e> 
##   |     (Parameters -- Column Names)
##   |      features_col: features
##   |      label_col: label
##   |--5 LogisticRegressionModel (Transformer)
##   |    <logistic_regression_4889771b69ab> 
##   |     (Parameters -- Column Names)
##   |      features_col: features
##   |      label_col: label
##   |      prediction_col: prediction
##   |      probability_col: probability
##   |      raw_prediction_col: rawPrediction
##   |     (Transformer Info)
##   |      coefficients:  num [1:2] 26.326 -0.158 
##   |      intercept:  num -408 
##   |      num_classes:  int 2 
##   |      num_features:  int 2 
##   |      threshold:  num 0.5
  1. Create a new table called current. It needs to pull today’s flights
library(lubridate)

current <- tbl(sc, "flights") %>%
  filter(
    month == !! month(now()),
    dayofmonth == !! day(now())
  )

show_query(current)
## <SQL>
## SELECT *
## FROM `flights`
## WHERE ((`month` = 2.0) AND (`dayofmonth` = 1))
  1. Create a new table called current. It needs to pull today’s flights
head(current)
## # Source:   lazy query [?? x 31]
## # Database: spark_connection
##   flightid year  month dayofmonth dayofweek deptime crsdeptime arrtime
##   <chr>    <chr> <chr> <chr>      <chr>     <chr>   <chr>      <chr>  
## 1 694864   2008  2     1          5         1219    1200       1454   
## 2 694875   2008  2     1          5         0       1540       0      
## 3 694884   2008  2     1          5         1237    1242       1407   
## 4 694902   2008  2     1          5         1406    1415       1540   
## 5 694993   2008  2     1          5         1153    1159       1338   
## 6 694997   2008  2     1          5         2145    2055       2232   
## # ... with 23 more variables: crsarrtime <chr>, uniquecarrier <chr>,
## #   flightnum <chr>, tailnum <chr>, actualelapsedtime <chr>,
## #   crselapsedtime <chr>, airtime <chr>, arrdelay <chr>, depdelay <chr>,
## #   origin <chr>, dest <chr>, distance <chr>, taxiin <chr>, taxiout <chr>,
## #   cancelled <chr>, cancellationcode <chr>, diverted <chr>,
## #   carrierdelay <chr>, weatherdelay <chr>, nasdelay <chr>,
## #   securitydelay <chr>, lateaircraftdelay <chr>, score <chr>
  1. Run predictions against the new data set
new_predictions <- ml_transform(
  x = reload,
  dataset = current
)
  1. Get a quick count of expected delayed flights
new_predictions %>%
  summarise(late_fligths = sum(prediction, na.rm = TRUE))
## # Source:   lazy query [?? x 1]
## # Database: spark_connection
##   late_fligths
##          <dbl>
## 1         7664

8.4 Reload pipeline

Overview of how to use new data to re-fit the pipeline, thus creating a new pipeline model

  1. Use ml_load() to reload the pipeline into the Spark session
flights_pipeline <- ml_load(sc, "saved_pipeline")
flights_pipeline
## Pipeline (Estimator) with 5 stages
## <pipeline_48894091d631> 
##   Stages 
##   |--1 SQLTransformer (Transformer)
##   |    <dplyr_transformer_4889709b1de9> 
##   |     (Parameters -- Column Names)
##   |--2 Binarizer (Transformer)
##   |    <binarizer_488915dc3fc5> 
##   |     (Parameters -- Column Names)
##   |      input_col: arrdelay
##   |      output_col: delayed
##   |--3 Bucketizer (Transformer)
##   |    <bucketizer_48892eb462c1> 
##   |     (Parameters -- Column Names)
##   |      input_col: crsdeptime
##   |      output_col: dephour
##   |--4 RFormula (Estimator)
##   |    <r_formula_488934917e7e> 
##   |     (Parameters -- Column Names)
##   |      features_col: features
##   |      label_col: label
##   |     (Parameters)
##   |      formula: delayed ~ arrdelay + dephour
##   |--5 LogisticRegression (Estimator)
##   |    <logistic_regression_4889771b69ab> 
##   |     (Parameters -- Column Names)
##   |      features_col: features
##   |      label_col: label
##   |      prediction_col: prediction
##   |      probability_col: probability
##   |      raw_prediction_col: rawPrediction
##   |     (Parameters)
##   |      elastic_net_param: 0
##   |      fit_intercept: TRUE
##   |      max_iter: 100
##   |      reg_param: 0
##   |      standardization: TRUE
##   |      threshold: 0.5
##   |      tol: 1e-06
  1. Create a new sample data set using sample_frac()
sample <- tbl(sc, "flights") %>%
  sample_frac(0.001) 
  1. Re-fit the model using ml_fit() and the new sample data
new_model <- ml_fit(flights_pipeline, sample)
new_model
## PipelineModel (Transformer) with 5 stages
## <pipeline_48894091d631> 
##   Stages 
##   |--1 SQLTransformer (Transformer)
##   |    <dplyr_transformer_4889709b1de9> 
##   |     (Parameters -- Column Names)
##   |--2 Binarizer (Transformer)
##   |    <binarizer_488915dc3fc5> 
##   |     (Parameters -- Column Names)
##   |      input_col: arrdelay
##   |      output_col: delayed
##   |--3 Bucketizer (Transformer)
##   |    <bucketizer_48892eb462c1> 
##   |     (Parameters -- Column Names)
##   |      input_col: crsdeptime
##   |      output_col: dephour
##   |--4 RFormulaModel (Transformer)
##   |    <r_formula_488934917e7e> 
##   |     (Parameters -- Column Names)
##   |      features_col: features
##   |      label_col: label
##   |     (Transformer Info)
##   |      formula:  chr "delayed ~ arrdelay + dephour" 
##   |--5 LogisticRegressionModel (Transformer)
##   |    <logistic_regression_4889771b69ab> 
##   |     (Parameters -- Column Names)
##   |      features_col: features
##   |      label_col: label
##   |      prediction_col: prediction
##   |      probability_col: probability
##   |      raw_prediction_col: rawPrediction
##   |     (Transformer Info)
##   |      coefficients:  num [1:2] 23.77421 -0.00874 
##   |      intercept:  num -369 
##   |      num_classes:  int 2 
##   |      num_features:  int 2 
##   |      threshold:  num 0.5
  1. Save the newly fitted model
ml_save(new_model, "new_model", overwrite = TRUE)
## NULL
list.files("new_model")
## [1] "metadata" "stages"
  1. Disconnect from Spark
spark_disconnect(sc)