Spark MLlib - Feature Transformers (Tokenizer, HashingTF, IDF)

Text data requires transformation into numerical representations before machine learning algorithms can process it. Spark MLlib provides three core transformers that work together: Tokenizer breaks...

Key Insights

  • Feature transformers in Spark MLlib convert raw text into numerical vectors that machine learning algorithms can process, with Tokenizer splitting text, HashingTF creating term frequency vectors, and IDF weighting terms by importance
  • The HashingTF transformer uses the hashing trick to map tokens to fixed-size feature vectors without maintaining a vocabulary dictionary, making it memory-efficient for large-scale text processing
  • Combining TF-IDF transformers creates a pipeline that identifies distinctive terms across documents, essential for text classification, clustering, and similarity analysis in distributed environments

Understanding the Text-to-Vector Pipeline

Text data requires transformation into numerical representations before machine learning algorithms can process it. Spark MLlib provides three core transformers that work together: Tokenizer breaks text into words, HashingTF converts words into frequency vectors, and IDF weights those frequencies by term importance across the corpus.

Here’s a basic example showing all three transformers in action:

from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("TFIDFExample").getOrCreate()

# Sample dataset
documents = spark.createDataFrame([
    (0, "Apache Spark is a unified analytics engine"),
    (1, "Spark provides high-level APIs in Java Python and Scala"),
    (2, "Machine learning library MLlib is built on Spark"),
    (3, "Spark streaming processes real-time data streams")
], ["id", "text"])

# Tokenization
tokenizer = Tokenizer(inputCol="text", outputCol="words")
wordsData = tokenizer.transform(documents)

# Term Frequency
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)

# Inverse Document Frequency
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("id", "text", "features").show(truncate=False)

This pipeline transforms raw text into TF-IDF feature vectors. Each transformer serves a specific purpose in the conversion process.

Tokenizer: Breaking Text into Components

The Tokenizer splits text strings into individual tokens using whitespace as the default delimiter. It’s the first step in text preprocessing and handles basic text segmentation.

from pyspark.ml.feature import Tokenizer, RegexTokenizer

# Standard tokenizer
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

# Regex tokenizer for more control
regexTokenizer = RegexTokenizer(
    inputCol="sentence",
    outputCol="words",
    pattern="\\W+",  # Split on non-word characters
    minTokenLength=3  # Filter tokens shorter than 3 characters
)

sentences = spark.createDataFrame([
    (0, "Hi, I'm learning Spark MLlib!"),
    (1, "Natural language processing (NLP) is fascinating."),
    (2, "TF-IDF stands for Term Frequency-Inverse Document Frequency.")
], ["id", "sentence"])

# Compare outputs
standard_output = tokenizer.transform(sentences)
regex_output = regexTokenizer.transform(sentences)

print("Standard Tokenizer:")
standard_output.select("sentence", "words").show(truncate=False)

print("\nRegex Tokenizer (min length 3):")
regex_output.select("sentence", "words").show(truncate=False)

RegexTokenizer provides more flexibility for handling punctuation, case sensitivity, and minimum token lengths. Use it when you need precise control over tokenization rules.

HashingTF: Converting Tokens to Frequency Vectors

HashingTF applies the hashing trick to map tokens into a fixed-size feature vector. Instead of building a vocabulary dictionary, it hashes each token to an index position and counts occurrences.

from pyspark.ml.feature import HashingTF

# Create HashingTF with different configurations
hashingTF_small = HashingTF(inputCol="words", outputCol="features_small", numFeatures=10)
hashingTF_large = HashingTF(inputCol="words", outputCol="features_large", numFeatures=100)

documents = spark.createDataFrame([
    (0, ["spark", "mllib", "machine", "learning"]),
    (1, ["spark", "streaming", "data", "processing"]),
    (2, ["machine", "learning", "algorithms", "spark"])
], ["id", "words"])

# Transform with different feature sizes
result_small = hashingTF_small.transform(documents)
result_large = hashingTF_large.transform(documents)

print("Small feature space (10 features):")
result_small.select("words", "features_small").show(truncate=False)

print("\nLarge feature space (100 features):")
result_large.select("words", "features_large").show(truncate=False)

The numFeatures parameter controls the hash space size. Larger values reduce collision probability but increase memory usage. For production systems with large vocabularies, use 2^18 (262,144) or 2^20 (1,048,576) features.

Hash collisions occur when different tokens map to the same index. Here’s how to analyze collision impact:

from pyspark.sql.functions import size, array_distinct

# Calculate unique tokens vs feature space
df = spark.createDataFrame([
    (0, ["data", "science", "machine", "learning", "python", "spark", "hadoop"])
], ["id", "words"])

hashingTF = HashingTF(inputCol="words", outputCol="features", numFeatures=5)
result = hashingTF.transform(df)

print(f"Unique tokens: {df.select(size('words')).first()[0]}")
print(f"Feature space: {hashingTF.getNumFeatures()}")
print("Collisions likely due to small feature space")
result.select("features").show(truncate=False)

IDF: Weighting Terms by Importance

IDF (Inverse Document Frequency) reduces the weight of common terms and increases the weight of rare terms. It requires fitting on the corpus to calculate document frequencies.

from pyspark.ml.feature import IDF

# Prepare term frequency vectors
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=50)

corpus = spark.createDataFrame([
    (0, ["spark", "is", "great"]),
    (1, ["spark", "provides", "apis"]),
    (2, ["machine", "learning", "is", "important"]),
    (3, ["spark", "is", "fast"])
], ["id", "words"])

tf = hashingTF.transform(corpus)

# Fit IDF model
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=1)
idfModel = idf.fit(tf)
tfidf = idfModel.transform(tf)

# Compare TF vs TF-IDF
print("Term Frequency only:")
tf.select("id", "words", "rawFeatures").show(truncate=False)

print("\nTF-IDF weighted:")
tfidf.select("id", "words", "features").show(truncate=False)

The minDocFreq parameter filters out terms appearing in fewer than the specified number of documents. This removes very rare terms that might be typos or noise:

# Filter rare terms
idf_filtered = IDF(
    inputCol="rawFeatures",
    outputCol="features",
    minDocFreq=2  # Term must appear in at least 2 documents
)

idfModel_filtered = idf_filtered.fit(tf)
tfidf_filtered = idfModel_filtered.transform(tf)

print("TF-IDF with rare term filtering:")
tfidf_filtered.select("id", "features").show(truncate=False)

Building a Complete Text Classification Pipeline

Combine these transformers into a machine learning pipeline for text classification:

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import Tokenizer, HashingTF, IDF

# Training data
training = spark.createDataFrame([
    (0, "spark mllib machine learning library", 1.0),
    (1, "spark streaming real time processing", 1.0),
    (2, "hadoop mapreduce batch processing", 0.0),
    (3, "hive data warehouse sql queries", 0.0),
    (4, "spark sql structured data processing", 1.0),
    (5, "pig data flow scripting language", 0.0)
], ["id", "text", "label"])

# Define pipeline stages
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=100)
idf = IDF(inputCol="rawFeatures", outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)

pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, lr])

# Train model
model = pipeline.fit(training)

# Test predictions
test = spark.createDataFrame([
    (6, "spark machine learning algorithms"),
    (7, "hadoop distributed file system")
], ["id", "text"])

predictions = model.transform(test)
predictions.select("text", "prediction", "probability").show(truncate=False)

Performance Optimization Techniques

For large-scale text processing, optimize transformer performance:

# Persist intermediate results
tf_data = hashingTF.transform(words_data)
tf_data.persist()  # Cache term frequency vectors

# Fit IDF on cached data
idfModel = idf.fit(tf_data)
tfidf_data = idfModel.transform(tf_data)

# Repartition for better parallelism
documents_repartitioned = documents.repartition(200)

# Use binary term frequencies for presence/absence
hashingTF_binary = HashingTF(
    inputCol="words",
    outputCol="features",
    binary=True  # Use 1/0 instead of counts
)

The binary option works well for short documents where term presence matters more than frequency. Repartitioning balances workload across executors, especially important when processing millions of documents.

These transformers form the foundation of text analytics in Spark. Master their parameters and combinations to build efficient, scalable NLP pipelines that handle production workloads.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.