# Analyzing the Gutenberg Books Corpus - part 2

In this notebook, we will use the Gutenberg Corpus in the same form as last week. 

In the [first analysis notebook](https://github.com/dslab2018/dslab2018.github.io/blob/master/notebooks/DSLab_week7_gutenberg_corpus.ipynb) we explored various RDD methods and in the end built an N-gram viewer for the gutenberg books project. Now, we will use the corpus to train a simple language classification model using [Spark's machine learning library](http://spark.apache.org/docs/latest/mllib-guide.html) and Spark DataFrames.

<div class="alert alert-success">
<h3>The structure of this lab is as follows:</h3>

<ol>
    <li>initializing Spark and loading data</li>
    <li>construction of Spark DataFrames</li>
    <li>using core DataFrame functionality and comparisons to RDD methods</li>
    <li>using the Spark ML library for vectorization</li>
    <li>building a classifier pipeline</li>
</div>

## Set up and launch the Spark runtime *on your laptop*

In [1]:
# set this to the base spark directory on your system
spark_home = '/Users/rok/src/spark'
try:
    import findspark
    findspark.init(spark_home)
except ModuleNotFoundError as e:
    print('Info: {}'.format(e))

import getpass
import pyspark

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("Gutenberg text modelling") \
    .config("spark.driver.host", "localhost") \
    .getOrCreate()

In [4]:
sc = spark.sparkContext
spark

## Load the data

**TODO**: download the gutenberg_cleaned_rdd and extract it into the `data` directory in the base path of this repository.

Load this as `cleaned_rdd` using `sc.sequenceFile`.

In [5]:
cleaned_rdd = sc.sequenceFile('../data/gutenberg_cleaned_rdd/')

In [6]:
%time cleaned_rdd.cache().count()

CPU times: user 36.5 ms, sys: 13.9 ms, total: 50.4 ms
Wall time: 5.5 s


25198

In [7]:
cleaned_rdd.first()[1][:200]

'h_sides_dion_cassius_lx_35_says_that_seneca_composed_an_greek_apokolokuntosis_or_pumpkinification_of_claudius_after_his_death_the_title_being_a_parody_of_the_usual_greek_apotheosis_but_this_title_is_n'

Note that there were a few further pre-processing steps: we removed all punctuation, made the text lowercase, and replaced whitespace characters with "_".

### Load in the metadata dictionary and broadcast it

Just as in the previous notebook, we will load our pre-generated metadata dictionary and broadcast it to all the executors. 

In [8]:
import json

with open('../data/gutenberg_metadata.json', 'r') as f :
    meta = json.load(f)

In [9]:
# TODO: create meta_b by broadcasting meta_dict
meta_b = spark.sparkContext.broadcast(meta)

## DataFrames

A [`DataFrame`](http://spark.apache.org/docs/latest/sql-programming-guide.html#creating-dataframes) is analogous to Pandas or R dataframes. They are since v2.0 the "official" API for Spark and importantly, the development of the [machine learning library](http://spark.apache.org/docs/latest/ml-guide.html) is focused exclusively on the DataFrame API. Many low-level optimizations have been developed for DataFrames in recent versions of Spark, so that the overheads of using Python with Spark have also been minimized somewhat. Using DataFrames allows you to specify types for your operations which means that they can be offloaded to the Scala backend and optimized by the runtime. 

However, you frequently will find that there simply is no easy way of doing a particular operation with the DataFrame methods and will need to resort to the lower-level RDD API. 

## Creating a DataFrame

Here we will create a DataFrame out of the RDD that we were using in the previous excercies. The DataFrame is a much more natural fit for this dataset. The inclusion of the book metadata is much more natural here, simply as columns which can then be used in queries. 

To begin, we will map the RDD elements to type [Row](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Row) and recast the data as a DataFrame. Note that we are lazy here and are just using the default `StringType` for all columns, but we could be more specific and use e.g. `IntegerType` for the `gid` field. 

In [10]:
from pyspark.sql import Row
from pyspark.sql.types import IntegerType, StringType, ArrayType, StructField, StructType

# set up the Row 
df = spark.createDataFrame(
    cleaned_rdd.map(lambda x: Row(**meta_b.value[x[0]], text=x[1])), 
).cache()

For inspection, the `Row` class can be conveniently cast into a `dict`:

In [11]:
# first row
df.first().asDict()

{'author_id': '1308',
 'author_name': ['Seneca', ' Lucius Annaeus'],
 'birth_year': '1863',
 'death_year': '65',
 'downloads': '186',
 'first_name': 'Lucius Annaeus',
 'gid': '10001',
 'language': 'en',
 'last_name': 'Seneca',
 'license': 'Public domain in the USA.',
 'subtitle': '',
 'text': 'h_sides_dion_cassius_lx_35_says_that_seneca_composed_an_greek_apokolokuntosis_or_pumpkinification_of_claudius_after_his_death_the_title_being_a_parody_of_the_usual_greek_apotheosis_but_this_title_is_not_given_in_the_mss_of_the_ludus_de_morte_claudii_nor_is_there_anything_in_the_piece_which_suits_the_title_very_well_as_a_literary_form_the_piece_belongs_to_the_class_called_satura_menippea_a_satiric_medley_in_prose_and_verse_this_text_is_that_of_buecheler_with_a_few_trifling_changes_which_are_indicated_in_the_notes_we_have_been_courteously_allowed_by_messrs_weidmann_to_use_this_text_i_have_to_acknowledge_the_help_of_mr_balls_notes_from_which_i_have_taken_a_few_references_but_my_translation_was_made_

In [12]:
df.columns

['author_id',
 'author_name',
 'birth_year',
 'death_year',
 'downloads',
 'first_name',
 'gid',
 'language',
 'last_name',
 'license',
 'subtitle',
 'text',
 'title']

The DataFrame includes convenience methods for quickly inspecting the data. For example:

In [13]:
df.describe('birth_year').show()

+-------+------------------+
|summary|        birth_year|
+-------+------------------+
|  count|             20934|
|   mean|1829.9672587614018|
| stddev|114.48079532175821|
|    min|           -100 BC|
|    max|               973|
+-------+------------------+



Certain operations are much more covenient with the DataFrame API, such as `groupBy`, which yields a special [`GroupedData`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData) object. Check out the API for the different operations you can perform on grouped data -- here we use `count` to get the equivalent of our author-count from the previous exercise:

In [14]:
(df.groupBy('author_name')
   .count()
   .sort('count', ascending=False)
   .show()
)

+--------------------+-----+
|         author_name|count|
+--------------------+-----+
|           [Various]| 1654|
|                null|  835|
|         [Anonymous]|  278|
|[Balzac,  Honoré de]|  121|
|[Kingston,  Willi...|  113|
|      [Twain,  Mark]|  104|
|[Ballantyne,  R. ...|   95|
|[Jacobs,  W. W. (...|   94|
|           [Unknown]|   92|
|[Shakespeare,  Wi...|   87|
|    [Pepys,  Samuel]|   85|
|[Fenn,  George Ma...|   83|
| [Dumas,  Alexandre]|   75|
|     [Verne,  Jules]|   74|
|     [Sand,  George]|   73|
|[Howells,  Willia...|   70|
|[Churchill,  Wins...|   67|
| [Dickens,  Charles]|   61|
|[Henty,  G. A. (G...|   60|
|      [Harte,  Bret]|   58|
+--------------------+-----+
only showing top 20 rows



### Creating new columns

Lets make a new column with a publication date similar to the previous notebook:

In [15]:
df = df.withColumn('publication_year', (df.birth_year + 40))

**TODO**: Show author name, title and publication year; sort by publication_year in descending order

In [16]:
df.select('author_name', 'title', 'publication_year').sort(df.publication_year.desc()).show()

+-----------------+--------------------+----------------+
|      author_name|               title|publication_year|
+-----------------+--------------------+----------------+
|    [Blade,  Zoë]|            Identity|          2021.0|
|    [Blade,  Zoë]|     Less than Human|          2021.0|
|[Doctorow,  Cory]|  A Place so Foreign|          2011.0|
|[Doctorow,  Cory]|Eastern Standard ...|          2011.0|
|[Doctorow,  Cory]|      Little Brother|          2011.0|
|[Doctorow,  Cory]|Return to Pleasur...|          2011.0|
|[Doctorow,  Cory]|Someone Comes to ...|          2011.0|
|[Doctorow,  Cory]|Ebooks: Neither E...|          2011.0|
|[Doctorow,  Cory]|Home Again, Home ...|          2011.0|
|[Doctorow,  Cory]|          Printcrime|          2011.0|
|[Doctorow,  Cory]|           Craphound|          2011.0|
|[Doctorow,  Cory]|Super Man and the...|          2011.0|
|[Doctorow,  Cory]|Shadow of the Mot...|          2011.0|
|[Camacho,  Jorge]|La Majstro kaj Ma...|          2006.0|
|[Camacho,  Jo

# Language classification with Spark ML

Here we will use some of the same techniques we developed in the last excercise, but this time we will use the built-in methods of the [Spark ML library](http://spark.apache.org/docs/2.2.0/api/python/pyspark.ml#) instead of coding up our own transformation functions. We will apply the N-Gram technique to build a simple language classification model. 

The method is rather straightforward and outlined in [Cavnar & Trenkle 1994](http://odur.let.rug.nl/~vannoord/TextCat/textcat.pdf):

For each of the English/German training sets:

1. tokenize the text (spaces are also tokens, so we replace them with "_")
2. extract N-grams where 1 < N < 5
3. determine the most common N-grams for each corpus
4. encode both sets of documents using the combined top ngrams


## Character tokens vs. Word tokens
In the last notebook, we used words as "tokens" -- now we will use characters, even accounting for white space (which we have replaced with "_" above). We will use the two example sentences again:

    document 1: "John likes to watch movies. Mary likes movies too."
    document 2: "John also likes to watch football games"

## SparkML feature transformers

The SparkML library includes many data transformers that all support the same API (much in the same vein as Scikit-Learn). Here we are using the [`CountVectorizer`](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.CountVectorizer), [`NGram`](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.NGram) and [`RegexTokenizer`](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.RegexTokenizer). 

In [18]:
from pyspark.ml.feature import CountVectorizer, NGram, RegexTokenizer

### Define the transformations

We instantiate the three transformers that will be applied in turn. We will pass the output of one as the input of the next -- in the end our DataFrame will contain a column `vectors` that will be the vectorized version of the documents. 

In [19]:
regex_tokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", gaps=False, pattern='\S')
ngram = NGram(n=2, inputCol='tokens', outputCol='ngrams')
count_vectorizer = CountVectorizer(inputCol="ngrams", outputCol="vectors", vocabSize=1000)

So lets see what this does to our test sentences:

In [21]:
test_df = spark.createDataFrame([('John likes to watch movies. Mary likes movies too.',), ('John also likes to watch football games',)], ['text'])

test_df.collect()

[Row(text='John likes to watch movies. Mary likes movies too.'),
 Row(text='John also likes to watch football games')]

**TODO** Figure out how to run the `test_df` through the two transformers and generate an `test_ngram_df`. `show()` the `text`, `tokens`, and `ngrams` columns.

In [22]:
test_ngram_df = ngram.transform(
    regex_tokenizer.transform(test_df)
)
test_ngram_df.show()

+--------------------+--------------------+--------------------+
|                text|              tokens|              ngrams|
+--------------------+--------------------+--------------------+
|John likes to wat...|[j, o, h, n, l, i...|[j o, o h, h n, n...|
|John also likes t...|[j, o, h, n, a, l...|[j o, o h, h n, n...|
+--------------------+--------------------+--------------------+



**TODO**: Fit the `CountVectorizer` with `n=2` ngrams and store in `test_cv_model`:

In [23]:
test_cv_model = count_vectorizer.fit(test_ngram_df)

In [24]:
test_cv_model.vocabulary

['e s',
 'i k',
 'l i',
 's t',
 'k e',
 't o',
 'c h',
 'm o',
 'j o',
 'i e',
 'o w',
 'a t',
 'o o',
 't c',
 'h n',
 'v i',
 'o v',
 'o h',
 'w a',
 'a l',
 't b',
 's o',
 'm e',
 'l l',
 'y l',
 'h f',
 'm a',
 'g a',
 'n l',
 'o l',
 'f o',
 's .',
 'n a',
 'b a',
 'a m',
 'l g',
 's m',
 'a r',
 'o .',
 'h m',
 '. m',
 'l s',
 'o t',
 'r y']

**TODO**: transform `test_ngram_df` into vectors:

In [25]:
test_cv_model.transform(test_ngram_df).select('vectors').show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|vectors                                                                                                                                                                                                           |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|(44,[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,24,26,28,31,36,37,38,39,40,43],[4.0,2.0,2.0,2.0,2.0,2.0,1.0,2.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,2.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])         |
|(44,[0,1,2,3,4,5,6,8,10,11,12,13,14,17,18,19,20,21,22,23,25,27,29,30,32,33,34,35,41,42],[2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.

## ML Pipelines

Keeping track of these steps is a bit tedious -- if we wanted to repeat the above steps on different data, we would either have to write a wrapper function or re-execute all the cells again. It would be great if we could create a *pipeline* that encapsulated these steps and all we had to do was provide the inputs and parameters. 

The Spark ML library includes this concept of [Pipelines](https://spark.apache.org/docs/2.2.0/ml-pipeline.html) and we can use it to simplify complex ML workflows.

In [26]:
from pyspark.ml import Pipeline

In [27]:
cv_pipeline = Pipeline(
    stages=[
        regex_tokenizer,
        ngram,
        count_vectorizer,
    ]
)

In [28]:
(
    cv_pipeline.fit(test_df)
               .transform(test_df)
               .show()
)

+--------------------+--------------------+--------------------+--------------------+
|                text|              tokens|              ngrams|             vectors|
+--------------------+--------------------+--------------------+--------------------+
|John likes to wat...|[j, o, h, n, l, i...|[j o, o h, h n, n...|(44,[0,1,2,3,4,5,...|
|John also likes t...|[j, o, h, n, a, l...|[j o, o h, h n, n...|(44,[0,1,2,3,4,5,...|
+--------------------+--------------------+--------------------+--------------------+



This is much more concise and much less error prone! The really cool thing about pipelines is that I can now very easily change the parameters of the different components. Imagine we wanted to fit trigrams (`n=3`) instead of bigrams (`n=2`), and we wanted to change the name of the final column. We can reuse the same pipeline but feed it a *parameter map* specifying the changed parameter value:

In [29]:
# note the dictionaries added to fit() and transform() arguments
(
    cv_pipeline.fit(test_df, {ngram.n:3})
               .transform(test_df, {count_vectorizer.outputCol: 'new_vectors'})
               .show()
)

+--------------------+--------------------+--------------------+--------------------+
|                text|              tokens|              ngrams|         new_vectors|
+--------------------+--------------------+--------------------+--------------------+
|John likes to wat...|[j, o, h, n, l, i...|[j o h, o h n, h ...|(50,[0,1,2,3,4,5,...|
|John also likes t...|[j, o, h, n, a, l...|[j o h, o h n, h ...|(50,[0,1,2,3,4,5,...|
+--------------------+--------------------+--------------------+--------------------+



### Building a more complex pipeline

For our language classification we want to use ngrams 1-3. We can build a function that will yield a pipeline with this more complex setup. Our procedure here is like this:

1. tokenize as before
2. assemble the ngram transformers to yield n=1, n=2, etc columns
3. vectorize using each set of ngrams giving partial vectors
4. assemble the vectors into one complete feature vector

In [30]:
from pyspark.ml.feature import VectorAssembler

def ngram_vectorize(min_n=1, max_n=1, min_df=1):
    """Use a range of ngrams to vectorize a corpus"""
    tokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", gaps=False, pattern='\S')
    
    ngrams = []
    count_vectorizers = []
    
    for i in range(min_n, max_n+1):
        ngrams.append(
            NGram(n=i, inputCol='tokens', outputCol='ngrams_'+str(i))
        )
        count_vectorizers.append(
            CountVectorizer(inputCol='ngrams_'+str(i), outputCol='vectors_'+str(i), vocabSize=1000, minDF=min_df)
        )
    
    assembler = VectorAssembler(
        inputCols=['vectors_'+str(i) for i in range(min_n, max_n+1)], outputCol='features')
    
    return Pipeline(stages=[tokenizer] + ngrams + count_vectorizers + [assembler])

In [31]:
ngram_vectorize(1,3).fit(test_df).transform(test_df).select('features').show(truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
+-----------------------------------------------------------------------

### Preparing the DataFrames and models

For our language classifier we will use just two languages (English and either German or French). We need to create a DataFrame that is filtered to just include those languages. 

In addition, we will need this step of transforming raw string documents into vectors when we try the classifier on new data. We should therefore save the fitted NGram model for later. 

In [32]:
lang_df = df.filter(df.language.isin('en', 'de', 'fr')).cache()

In [33]:
ngram_model = ngram_vectorize(1,3, min_df=10).fit(lang_df)

In [34]:
ngram_model.transform(lang_df).select('features').first()

Row(features=SparseVector(2142, {0: 1734.0, 1: 896.0, 2: 609.0, 3: 588.0, 4: 570.0, 5: 473.0, 6: 495.0, 7: 502.0, 8: 414.0, 9: 485.0, 10: 321.0, 11: 291.0, 12: 235.0, 13: 162.0, 14: 158.0, 15: 142.0, 16: 138.0, 17: 205.0, 18: 143.0, 19: 146.0, 20: 112.0, 21: 61.0, 22: 60.0, 23: 10.0, 24: 6.0, 25: 8.0, 26: 5.0, 28: 4.0, 29: 3.0, 30: 4.0, 31: 2.0, 32: 3.0, 33: 4.0, 34: 5.0, 35: 4.0, 36: 1.0, 37: 1.0, 142: 361.0, 143: 259.0, 144: 244.0, 145: 223.0, 146: 214.0, 147: 181.0, 148: 141.0, 149: 150.0, 150: 141.0, 151: 91.0, 152: 99.0, 153: 116.0, 154: 117.0, 155: 97.0, 156: 90.0, 157: 79.0, 158: 110.0, 159: 131.0, 160: 154.0, 161: 96.0, 162: 80.0, 163: 68.0, 164: 79.0, 165: 58.0, 166: 70.0, 167: 84.0, 168: 65.0, 169: 76.0, 170: 53.0, 171: 76.0, 172: 57.0, 173: 66.0, 174: 71.0, 175: 76.0, 176: 44.0, 177: 77.0, 178: 45.0, 179: 60.0, 180: 80.0, 181: 56.0, 182: 59.0, 183: 46.0, 184: 46.0, 185: 59.0, 186: 48.0, 187: 42.0, 188: 25.0, 189: 75.0, 190: 52.0, 191: 60.0, 192: 49.0, 193: 28.0, 194: 33.0, 1

## Building the classifier

We have successfully transformed the dataset into a representation that we can (almost) feed into a classifier. What we need still is a label column as well the final stage of the pipeline that will fit the actual model. 

To generate labels from the language column, we will use the `StringIndexer` as a part of our pipeline. For the classification we will use the simplest possible `LogisticRegression` -- once you've convinced yourself that you know how it works, go ahead and experiment with other [classifiers](http://spark.apache.org/docs/latest/api/python/pyspark.ml#module-pyspark.ml.classification).

In [35]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer

**TODO:** Set up a `classification_pipeline`. Use the N-gram model we defined above as a starting stage, followed by a `StringIndexer` and a `LogisticRegression` classifier. Make sure you read the documentation on these!

Note that we can use the pre-trained N-gram model -- the `Pipeline` will automatically infer that the stage is already complete and will only use it in the transformation step. 

In [36]:
classification_pipeline = Pipeline(
    stages=[ngram_model, 
            StringIndexer(inputCol='language', outputCol='label'),
            LogisticRegression(regParam=0.002, elasticNetParam=1, maxIter=10)
           ]
)

Run the classifier! The fitting will take a while -- you may want to run this first on a subset of the data

In [50]:
# Split the training and test sets
training, test = lang_df.sample(True, 0.2).randomSplit([0.8,0.2])

In [51]:
classifier = classification_pipeline.fit(training)

In [52]:
# check the predictions 
for lang in ['en', 'fr', 'de']:
    print('Predictions for {0}'.format(lang))
    (classifier.transform(
        test.filter(test.language == lang))
            .select('label', 'probability', 'prediction')
            .show(10, truncate=False))

Predictions for en
+-----+---------------------------------------------------------------+----------+
|label|probability                                                    |prediction|
+-----+---------------------------------------------------------------+----------+
|0.0  |[0.9988005907780809,7.88610734359324E-4,4.1079848755980176E-4] |0.0       |
|0.0  |[0.998917107716628,7.33096968988312E-4,3.4979531438355903E-4]  |0.0       |
|0.0  |[0.9989542530960642,6.742910478926779E-4,3.7145585604316547E-4]|0.0       |
|0.0  |[0.9979812116689254,0.0014214961543689822,5.972921767057092E-4]|0.0       |
|0.0  |[0.9976958073687584,0.00162556839444595,6.786242367955829E-4]  |0.0       |
|0.0  |[0.99886258717009,7.499241334248441E-4,3.874886964851842E-4]   |0.0       |
|0.0  |[0.9990570458254298,6.331701031364094E-4,3.097840714338575E-4] |0.0       |
|0.0  |[0.9983657577006078,0.001131550441907882,5.026918574843947E-4] |0.0       |
|0.0  |[0.9990957214984193,5.871605212797993E-4,3.171179803008901E-4

You should be seeing mostly good agreement between `label` and `prediction`.

### Improving the model and continuing the exploration of the data

We have completed the basic model training, but many improvements are possible. One obvious improvement is hyperparameter tuning -- check out the [docs](http://spark.apache.org/docs/latest/ml-tuning.html#ml-tuning-model-selection-and-hyperparameter-tuning) for some examples and try it out!

Some other ideas for things you could do with this dataset: 

* try other [classifiers that are included in MLlib](http://spark.apache.org/docs/latest/mllib-classification-regression.html)
* build a regression model to predict year of publication (may be better with word ngrams)
* do clustering on the english books and see if sub-groups of the language pop up
* cluster by author -- do certain authors write in similar ways?

In [53]:
spark.stop()