8 Spark pipelines
8.1 Recreate the transformations
Overview of how most of the existing code will be reused
- Register a new table called current containing a sample of the base flights table
model_data <- sdf_partition(
tbl(sc, "flights"),
training = 0.01,
testing = 0.01,
rest = 0.98
)
- Recreate the
dplyr
code in thecached_flights
variable from the previous unit
pipeline_df <- model_data$training %>%
mutate(
arrdelay = ifelse(arrdelay == "NA", 0, arrdelay),
depdelay = ifelse(depdelay == "NA", 0, depdelay)
) %>%
select(
month,
dayofmonth,
arrtime,
arrdelay,
depdelay,
crsarrtime,
crsdeptime,
distance
) %>%
mutate_all(as.numeric)
- Create a new Spark pipeline
flights_pipeline <- ml_pipeline(sc) %>%
ft_dplyr_transformer(
tbl = pipeline_df
) %>%
ft_binarizer(
input_col = "arrdelay",
output_col = "delayed",
threshold = 15
) %>%
ft_bucketizer(
input_col = "crsdeptime",
output_col = "dephour",
splits = c(0, 400, 800, 1200, 1600, 2000, 2400)
) %>%
ft_r_formula(delayed ~ arrdelay + dephour) %>%
ml_logistic_regression()
flights_pipeline
## Pipeline (Estimator) with 5 stages
## <pipeline_48894091d631>
## Stages
## |--1 SQLTransformer (Transformer)
## | <dplyr_transformer_4889709b1de9>
## | (Parameters -- Column Names)
## |--2 Binarizer (Transformer)
## | <binarizer_488915dc3fc5>
## | (Parameters -- Column Names)
## | input_col: arrdelay
## | output_col: delayed
## |--3 Bucketizer (Transformer)
## | <bucketizer_48892eb462c1>
## | (Parameters -- Column Names)
## | input_col: crsdeptime
## | output_col: dephour
## |--4 RFormula (Estimator)
## | <r_formula_488934917e7e>
## | (Parameters -- Column Names)
## | features_col: features
## | label_col: label
## | (Parameters)
## | formula: delayed ~ arrdelay + dephour
## |--5 LogisticRegression (Estimator)
## | <logistic_regression_4889771b69ab>
## | (Parameters -- Column Names)
## | features_col: features
## | label_col: label
## | prediction_col: prediction
## | probability_col: probability
## | raw_prediction_col: rawPrediction
## | (Parameters)
## | elastic_net_param: 0
## | fit_intercept: TRUE
## | max_iter: 100
## | reg_param: 0
## | standardization: TRUE
## | threshold: 0.5
## | tol: 1e-06
8.2 Fit, evaluate, save
- Fit (train) the pipeline’s model
model <- ml_fit(flights_pipeline, model_data$training)
model
## PipelineModel (Transformer) with 5 stages
## <pipeline_48894091d631>
## Stages
## |--1 SQLTransformer (Transformer)
## | <dplyr_transformer_4889709b1de9>
## | (Parameters -- Column Names)
## |--2 Binarizer (Transformer)
## | <binarizer_488915dc3fc5>
## | (Parameters -- Column Names)
## | input_col: arrdelay
## | output_col: delayed
## |--3 Bucketizer (Transformer)
## | <bucketizer_48892eb462c1>
## | (Parameters -- Column Names)
## | input_col: crsdeptime
## | output_col: dephour
## |--4 RFormulaModel (Transformer)
## | <r_formula_488934917e7e>
## | (Parameters -- Column Names)
## | features_col: features
## | label_col: label
## | (Transformer Info)
## | formula: chr "delayed ~ arrdelay + dephour"
## |--5 LogisticRegressionModel (Transformer)
## | <logistic_regression_4889771b69ab>
## | (Parameters -- Column Names)
## | features_col: features
## | label_col: label
## | prediction_col: prediction
## | probability_col: probability
## | raw_prediction_col: rawPrediction
## | (Transformer Info)
## | coefficients: num [1:2] 26.326 -0.158
## | intercept: num -408
## | num_classes: int 2
## | num_features: int 2
## | threshold: num 0.5
- Use the newly fitted model to perform predictions using
ml_transform()
predictions <- ml_transform(
x = model,
dataset = model_data$testing
)
- Use
group_by()
to see how the model performed
predictions %>%
group_by(delayed, prediction) %>%
tally()
## # Source: lazy query [?? x 3]
## # Database: spark_connection
## # Groups: delayed
## delayed prediction n
## <dbl> <dbl> <dbl>
## 1 0 0 55403
## 2 1.00 1.00 14717
- Save the model into disk using
ml_save()
ml_save(model, "saved_model", overwrite = TRUE)
## NULL
list.files("saved_model")
## [1] "metadata" "stages"
- Save the pipeline using
ml_save()
ml_save(flights_pipeline, "saved_pipeline", overwrite = TRUE)
## NULL
list.files("saved_pipeline")
## [1] "metadata" "stages"
- Close the Spark session
spark_disconnect(sc)
8.3 Reload model
Use the saved model inside a different Spark session
- Open a new Spark connection and reload the data
library(sparklyr)
sc <- spark_connect(master = "local", version = "2.0.0")
spark_flights <- spark_read_csv(
sc,
name = "flights",
path = "/usr/share/flights/data/",
memory = FALSE,
columns = file_columns,
infer_schema = FALSE
)
- Use
ml_load()
to reload the model directly into the Spark session
reload <- ml_load(sc, "saved_model")
reload
## PipelineModel (Transformer) with 5 stages
## <pipeline_48894091d631>
## Stages
## |--1 SQLTransformer (Transformer)
## | <dplyr_transformer_4889709b1de9>
## | (Parameters -- Column Names)
## |--2 Binarizer (Transformer)
## | <binarizer_488915dc3fc5>
## | (Parameters -- Column Names)
## | input_col: arrdelay
## | output_col: delayed
## |--3 Bucketizer (Transformer)
## | <bucketizer_48892eb462c1>
## | (Parameters -- Column Names)
## | input_col: crsdeptime
## | output_col: dephour
## |--4 RFormulaModel (Transformer)
## | <r_formula_488934917e7e>
## | (Parameters -- Column Names)
## | features_col: features
## | label_col: label
## |--5 LogisticRegressionModel (Transformer)
## | <logistic_regression_4889771b69ab>
## | (Parameters -- Column Names)
## | features_col: features
## | label_col: label
## | prediction_col: prediction
## | probability_col: probability
## | raw_prediction_col: rawPrediction
## | (Transformer Info)
## | coefficients: num [1:2] 26.326 -0.158
## | intercept: num -408
## | num_classes: int 2
## | num_features: int 2
## | threshold: num 0.5
- Create a new table called current. It needs to pull today’s flights
library(lubridate)
current <- tbl(sc, "flights") %>%
filter(
month == !! month(now()),
dayofmonth == !! day(now())
)
show_query(current)
## <SQL>
## SELECT *
## FROM `flights`
## WHERE ((`month` = 2.0) AND (`dayofmonth` = 1))
- Create a new table called current. It needs to pull today’s flights
head(current)
## # Source: lazy query [?? x 31]
## # Database: spark_connection
## flightid year month dayofmonth dayofweek deptime crsdeptime arrtime
## <chr> <chr> <chr> <chr> <chr> <chr> <chr> <chr>
## 1 694864 2008 2 1 5 1219 1200 1454
## 2 694875 2008 2 1 5 0 1540 0
## 3 694884 2008 2 1 5 1237 1242 1407
## 4 694902 2008 2 1 5 1406 1415 1540
## 5 694993 2008 2 1 5 1153 1159 1338
## 6 694997 2008 2 1 5 2145 2055 2232
## # ... with 23 more variables: crsarrtime <chr>, uniquecarrier <chr>,
## # flightnum <chr>, tailnum <chr>, actualelapsedtime <chr>,
## # crselapsedtime <chr>, airtime <chr>, arrdelay <chr>, depdelay <chr>,
## # origin <chr>, dest <chr>, distance <chr>, taxiin <chr>, taxiout <chr>,
## # cancelled <chr>, cancellationcode <chr>, diverted <chr>,
## # carrierdelay <chr>, weatherdelay <chr>, nasdelay <chr>,
## # securitydelay <chr>, lateaircraftdelay <chr>, score <chr>
- Run predictions against the new data set
new_predictions <- ml_transform(
x = reload,
dataset = current
)
- Get a quick count of expected delayed flights
new_predictions %>%
summarise(late_fligths = sum(prediction, na.rm = TRUE))
## # Source: lazy query [?? x 1]
## # Database: spark_connection
## late_fligths
## <dbl>
## 1 7664
8.4 Reload pipeline
Overview of how to use new data to re-fit the pipeline, thus creating a new pipeline model
- Use
ml_load()
to reload the pipeline into the Spark session
flights_pipeline <- ml_load(sc, "saved_pipeline")
flights_pipeline
## Pipeline (Estimator) with 5 stages
## <pipeline_48894091d631>
## Stages
## |--1 SQLTransformer (Transformer)
## | <dplyr_transformer_4889709b1de9>
## | (Parameters -- Column Names)
## |--2 Binarizer (Transformer)
## | <binarizer_488915dc3fc5>
## | (Parameters -- Column Names)
## | input_col: arrdelay
## | output_col: delayed
## |--3 Bucketizer (Transformer)
## | <bucketizer_48892eb462c1>
## | (Parameters -- Column Names)
## | input_col: crsdeptime
## | output_col: dephour
## |--4 RFormula (Estimator)
## | <r_formula_488934917e7e>
## | (Parameters -- Column Names)
## | features_col: features
## | label_col: label
## | (Parameters)
## | formula: delayed ~ arrdelay + dephour
## |--5 LogisticRegression (Estimator)
## | <logistic_regression_4889771b69ab>
## | (Parameters -- Column Names)
## | features_col: features
## | label_col: label
## | prediction_col: prediction
## | probability_col: probability
## | raw_prediction_col: rawPrediction
## | (Parameters)
## | elastic_net_param: 0
## | fit_intercept: TRUE
## | max_iter: 100
## | reg_param: 0
## | standardization: TRUE
## | threshold: 0.5
## | tol: 1e-06
- Create a new sample data set using
sample_frac()
sample <- tbl(sc, "flights") %>%
sample_frac(0.001)
- Re-fit the model using
ml_fit()
and the new sample data
new_model <- ml_fit(flights_pipeline, sample)
new_model
## PipelineModel (Transformer) with 5 stages
## <pipeline_48894091d631>
## Stages
## |--1 SQLTransformer (Transformer)
## | <dplyr_transformer_4889709b1de9>
## | (Parameters -- Column Names)
## |--2 Binarizer (Transformer)
## | <binarizer_488915dc3fc5>
## | (Parameters -- Column Names)
## | input_col: arrdelay
## | output_col: delayed
## |--3 Bucketizer (Transformer)
## | <bucketizer_48892eb462c1>
## | (Parameters -- Column Names)
## | input_col: crsdeptime
## | output_col: dephour
## |--4 RFormulaModel (Transformer)
## | <r_formula_488934917e7e>
## | (Parameters -- Column Names)
## | features_col: features
## | label_col: label
## | (Transformer Info)
## | formula: chr "delayed ~ arrdelay + dephour"
## |--5 LogisticRegressionModel (Transformer)
## | <logistic_regression_4889771b69ab>
## | (Parameters -- Column Names)
## | features_col: features
## | label_col: label
## | prediction_col: prediction
## | probability_col: probability
## | raw_prediction_col: rawPrediction
## | (Transformer Info)
## | coefficients: num [1:2] 23.77421 -0.00874
## | intercept: num -369
## | num_classes: int 2
## | num_features: int 2
## | threshold: num 0.5
- Save the newly fitted model
ml_save(new_model, "new_model", overwrite = TRUE)
## NULL
list.files("new_model")
## [1] "metadata" "stages"
- Disconnect from Spark
spark_disconnect(sc)