PySpark Cheat Sheet

Published:

🐍 📄 PySpark Cheat Sheet

  • A quick reference guide to the most commonly used patterns and functions in PySpark SQL.

  • Importing Functions & Types
    • Easily reference these as F.my_function() and T.my_type() below

        from pyspark.sql import functions as F, types as T
      
  • Filtering
    • Filter on equals condition

        df = df.filter(df.is_adult == 'Y')
      
    • Filter on >, <, >=, <= condition

        df = df.filter(df.age > 25)
      
    • Multiple conditions require parens around each

        df = df.filter((df.age > 25) & (df.is_adult == 'Y'))
      
  • Joins
    • Left join in another dataset

        df = df.join(person_lookup_table, 'person_id', 'left')
      
    • Useful for one-liner lookup code joins if you have a bunch

        def lookup_and_replace(df1, df2, df1_key, df2_key, df2_value):
            return (
                df1
                .join(df2[[df2_key, df2_value]], df1[df1_key] == df2[df2_key], 'left')
                .withColumn(df1_key, F.coalesce(F.col(df2_value), F.col(df1_key)))
                .drop(df2_key)
                .drop(df2_value)
            )
      
        df = lookup_and_replace(people, pay_codes, id, pay_code_id, pay_code_desc)
      
  • Creating New Columns
    • Add a new static column

        df = df.withColumn('status', F.lit('PASS'))
      
    • Construct a new dynamic column

        df = df.withColumn('full_name', F.when(
            (df.fname.isNotNull() & df.lname.isNotNull()), F.concat(df.fname, df.lname)
        ).otherwise(F.lit('N/A'))
      
  • Coalescing Values
    • Take the first value that is not null

        df = df.withColumn('last_name', F.coalesce(df.last_name, df.surname, F.lit('N/A')))
      
  • Casting, Nulls & Duplicates
    • Cast a column to a different type

        df = df.withColumn('price', df.price.cast(T.DoubleType()))
      
    • Replace all nulls with a specific value

        df = df.fillna({
            'first_name': 'Tom',
            'age': 0,
         })
      
    • Drop duplicate rows in a dataset (distinct)

        df = df.dropDuplicates()
      
    • Drop duplicate rows, but consider only specific columns

        df = df.dropDuplicates(['name', 'height'])
      
  • Column Operations
    • Pick which columns to keep, optionally rename some

        df = df.select(
            'name',
            'age',
            F.col('dob').alias('date_of_birth'),
        )
      
    • Remove columns

        df = df.drop('mod_dt', 'mod_username')
      
    • Rename a column

        df = df.withColumnRenamed('dob', 'date_of_birth')
      
    • Keep all the columns which also occur in another dataset

        df = df.select(*(F.col(c) for c in df2.columns))
      
    • Batch Rename/Clean Columns

        for col in df.columns:
            df = df.withColumnRenamed(col, col.lower().replace(' ', '_').replace('-', '_'))
      
  • String Operations String Filters
    • Contains - col.contains(string)

        df = df.filter(df.name.contains('o'))
      
    • Starts With - col.startswith(string)

        df = df.filter(df.name.startswith('Al'))
      
    • Ends With - col.endswith(string)

        df = df.filter(df.name.endswith('ice'))
      
    • Is Null - col.isNull()

        df = df.filter(df.is_adult.isNull())
      
    • Is Not Null - col.isNotNull()

        df = df.filter(df.first_name.isNotNull())
      
    • Like - col.like(string_with_sql_wildcards)

        df = df.filter(df.name.like('Al%'))
      
    • Regex Like - col.rlike(regex)

        df = df.filter(df.name.rlike('[A-Z]*ice$'))
      
    • Is In List - col.isin(*cols)

        df = df.filter(df.name.isin('Bob', 'Mike'))
      
  • String Functions
    • Substring - col.substr(startPos, length)

        df = df.withColumn('short_id', df.id.substr(0, 10))
      
    • Trim - F.trim(col)

        df = df.withColumn('name', F.trim(df.name))
      
        # Left Pad - F.lpad(col, len, pad)
        # Right Pad - F.rpad(col, len, pad)
        df = df.withColumn('id', F.lpad('id', 4, '0'))
      
         # Left Trim - F.ltrim(col)
         # Right Trim - F.rtrim(col)
         df = df.withColumn('id', F.ltrim('id'))
      
    • Concatenate - F.concat(*cols)

        df = df.withColumn('full_name', F.concat('fname', F.lit(' '), 'lname'))
      
    • Concatenate with Separator/Delimiter - F.concat_ws(*cols)

        df = df.withColumn('full_name', F.concat_ws('-', 'fname', 'lname'))
      
    • Regex Replace - F.regexp_replace(str, pattern, replacement)[source]

        df = df.withColumn('id', F.regexp_replace(id, '0F1(.*)', '1F1-$1'))
      
    • Regex Extract - F.regexp_extract(str, pattern, idx)

        df = df.withColumn('id', F.regexp_extract(id, '[0-9]*', 0))
      
  • Number Operations
    • Round - F.round(col, scale=0)

        df = df.withColumn('price', F.round('price', 0))
      
    • Floor - F.floor(col)

        df = df.withColumn('price', F.floor('price'))
      
    • Ceiling - F.ceil(col)

        df = df.withColumn('price', F.ceil('price'))
      
  • Array Operations
    • Column Array - F.array(*cols)

        df = df.withColumn('full_name', F.array('fname', 'lname'))
      
    • Empty Array - F.array(*cols)

        df = df.withColumn('empty_array_column', F.array([]))
      
  • Aggregation Operations

       # Count - F.count()
       # Sum - F.sum(*cols)
       # Mean - F.mean(*cols)
       # Max - F.max(*cols)
       # Min - F.min(*cols)
       df = df.groupBy('gender').agg(F.max('age').alias('max_age_by_gender'))
    
      # Collect Set - F.collect_set(col)
      # Collect List - F.collect_list(col)
      df = df.groupBy('age').agg(F.collect_set('name').alias('person_names'))
    
  • Advanced Operations Repartitioning
    • Repartition – df.repartition(num_output_partitions)

        df = df.repartition(1)
      

    UDFs (User Defined Functions)

    • Multiply each row’s age column by two

        times_two_udf = F.udf(lambda x: x * 2)
        df = df.withColumn('age', times_two_udf(df.age))
      
    • Randomly choose a value to use as a row’s name

        import random
      
        random_name_udf = F.udf(lambda: random.choice(['Bob', 'Tom', 'Amy', 'Jenna']))
        df = df.withColumn('name', random_name_udf())