Monthly Archives: December 2016

Managing Spark dataframes in Python

Below a quick sample of using Apache Spark (2.0) dataframes for manipulating data. Sample data is a file of jsonlines like

{"description": "255/40 ZR17 94W", "ean": "EAN: 4981910401193", "season": "tires_season summer", "price": "203,98", "model": "Michelin Pilot Sport PS2 255/40 R17", "id": "MPN: 2351610"}
{"description": "225/55 R17 101V XL", "ean": "EAN: 5452000438744", "season": "tires_season summer", "price": "120,98", "model": "Pirelli P Zero 205/45 R17", "id": "MPN: 530155"}
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import *
import re, sys


# warehouse_location points to the default location for managed databases and tables
warehouse_location = 'spark-warehouse'

spark = SparkSession \
    .builder \
    .appName("Python Spark  ") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()

records_orig = spark.read.json("scraped_tyres_data.json")

## removing bad records 
records = records_orig \
  .filter(records.id != '') \
  .filter(regexp_extract('description', '(rinnovati)', 1) == '')

## saving bad records  
records_orig.subtract(records).coalesce(1).write.csv("bad-records.csv", sep=";")

# extract new features
regexp_size = "(\d+)/(\d+) R(\d+) (\d+)(\w+)\s*"

records = records \
  .withColumn("width",       regexp_extract("description", regexp_size, 1)) \
  .withColumn("ratio",       regexp_extract("description", regexp_size, 2)) \
  .withColumn("diameter",    regexp_extract("description", regexp_size, 3)) \
  .withColumn("load_index",  regexp_extract("description", regexp_size, 4)) \
  .withColumn("speed_index", regexp_extract("description", regexp_size, 5)) \
  .withColumn("brand",       regexp_extract("model", "^(\w+) ", 1)) \
  .withColumn("season",      trim(regexp_replace("season", "tires_season",""))) \
  .withColumn("id",          trim(regexp_replace("id", "MPN: ",""))) \
  .withColumn("ean",         trim(regexp_replace("ean", "EAN: ",""))) \
  .withColumn("runflat",     regexp_extract("description", "(runflat)", 1)) \
  .withColumn("mfs",         regexp_extract("description", "(MFS|FSL|bordo di protezione|bordino di protezione)", 1)) \
  .withColumn("xl",          regexp_extract("description", " (XL|RF)\s*", 1)) \
  .withColumn("chiodabile",  regexp_extract("description", "(chiodabile)\s*", 1))

## extracting and saving all season values
records.select("season").distinct().coalesce(1).write.csv("season_values", sep=";")

# misc
# records.columns   # show columns
# records.groupBy("brand").count().show()
# records.groupBy("brand").count().filter("count > 100").show(20,False)
#
# renaming all columns before joining dataframes with same column names
# records_renamed = records.select(*(col(x).alias(x + '_renamed') for x in records.columns))
# join two dataframe
# records.join(record_renamed, records.ean == records_renamed.ean_renamed)
#
#
# saving data to several formats
records.coalesce(1).write.csv("result.csv", sep=";")
records.write.json("result.json")
records.write.parquet("result.parquet")
records.write.format("com.databricks.spark.avro").save("result.avro")