Add more 🔥 to Apache Spark!
Silex is a Data Engineering library to extend PySpark.
You don't need another class, just use PySpark as usual and you have new functions to your DataFrames!
pip install spark-silex
import silex
from pyspark.sql import DataFrame
# extends your DataFrame with silex functions!
# if for some reason you don't want to do that, check 'Without extending Dataframes' README section below
silex.extend_dataframes()
df: DataFrame = ... # your regular Spark DataFrame
df: DataFrame = df.drop_col_if_na() # new function! and still a regular Spark Dataframe!
# scroll for more information!
- IDE auto-completions
- show-off coverage
- more documentation and examples
- more functions
# assertions => raises an Exception if not met /!\
def expect_column(self, col: str) -> DataFrame: ...
def expect_columns(self, cols: Union[str, List[str]]) -> DataFrame: ...
def expect_distinct_values_equal_set(self, cols: Union[str, List[str]], values: Collection[Any]) -> DataFrame: ...
def expect_distinct_values_in_set(self, cols: Union[str, List[str]], values: Collection[Any]) -> DataFrame: ...
def expect_min_value_between(self, cols: Union[str, List[str]], min: Any, max: Any) -> DataFrame: ...
def expect_avg_value_between(self, cols: Union[str, List[str]], min: Any, max: Any) -> DataFrame: ...
def expect_max_value_between(self, cols: Union[str, List[str]], min: Any, max: Any) -> DataFrame: ...
def expect_unique_id(self, cols: Union[str, List[str]]) -> DataFrame: ...
# boolean checks
def has_column(self, col: str) -> bool: ...
def has_columns(self, cols: Union[str, List[str]]) -> bool: ...
def has_distinct_values_equal_set(self, cols: Union[str, List[str]], values: Collection[Any]) -> bool: ...
def has_distinct_values_in_set(self, cols: Union[str, List[str]], values: Collection[Any]) -> bool: ...
def has_min_value_between(self, cols: Union[str, List[str]], min: Any, max: Any) -> bool: ...
def has_avg_value_between(self, cols: Union[str, List[str]], min: Any, max: Any) -> bool: ...
def has_max_value_between(self, cols: Union[str, List[str]], min: Any, max: Any) -> bool: ...
def has_unique_id(self, cols: Union[str, List[str]]) -> bool: ...
# dates
def with_date_column(self, col: str, fmt: str, new_col: Optional[str] = None) -> DataFrame: ...
# drop
def drop_col_if_na(self, max: int) -> DataFrame: ...
def drop_col_if_not_distinct(self, min: int) -> DataFrame: ...
# filters
def filter_on_range(self, col: str, from_: Any, to: Any, ...) -> DataFrame: ...
# joins
def join_closest_date(self, other: DataFrame, ...) -> DataFrame: ...
- Python 3.8 or above
- Spark 3 or above
pip install spark-silex
import silex
from pyspark.sql import DataFrame, SparkSession
# extends your DataFrame with silex functions!
# if for some reason you don't want to do that, check next example
silex.extend_dataframes()
spark = SparkSession.builder.getOrCreate()
data = [
(0, "01/29/2022", "a", 1.0),
(1, "01/30/2022", "b", 2.0),
(2, "01/31/2022", "c", 3.0),
]
df: DataFrame = spark.createDataFrame(data, schema=["id", "date_US", "text", "value"])
df.show()
# +---+----------+----+-----+
# | id| date_US|text|value|
# +---+----------+----+-----+
# | 0|01/29/2022| a| 1.0|
# | 1|01/30/2022| b| 2.0|
# | 2|01/31/2022| c| 3.0|
# +---+----------+----+-----+
df = df.with_date_column(col="date_US", new_col="date", fmt="MM/dd/yyyy")
df.show()
df.printSchema()
# +---+----------+----+-----+----------+
# | id| date_US|text|value| date|
# +---+----------+----+-----+----------+
# | 0|01/29/2022| a| 1.0|2022-01-29|
# | 1|01/30/2022| b| 2.0|2022-01-30|
# | 2|01/31/2022| c| 3.0|2022-01-31|
# +---+----------+----+-----+----------+
#
# root
# |-- id: long (nullable = true)
# |-- date_US: string (nullable = true)
# |-- text: string (nullable = true)
# |-- value: double (nullable = true)
# |-- date: date (nullable = true)
from silex.fn.date import with_date_column
# same as before except you need to provide the DataFrame
df = with_date_column(df=df, col="date_US", new_col="date", fmt="MM/dd/yyyy")
# install poetry and python 3.8, using pyenv for instance
cd silex
poetry env use path/to/python3.8 # e.g. ~/.pyenv/versions/3.8.12/bin/python
poetry shell
poetry install
pre-commit install
make help
# or open Makefile to learn about available commands for development