deagar commited on
Commit
cfbd02f
·
1 Parent(s): 309b3f6

Updated assessment notebook, added solutions

Browse files
Files changed (2) hide show
  1. notebooks/assesment.ipynb +314 -15
  2. notebooks/solutions.ipynb +308 -0
notebooks/assesment.ipynb CHANGED
@@ -4,27 +4,326 @@
4
  "cell_type": "markdown",
5
  "metadata": {},
6
  "source": [
7
- "# PySpark Data Engineering Assessment\n",
8
  "\n",
9
- "## Tasks\n",
10
  "\n",
11
- "1. Read the CSV data (in `../data/titanic.csv`) into:\n",
12
- " - a Pandas DataFrame\n",
13
- " - a Spark DataFrame\n",
 
 
14
  "\n",
15
- "2. Perform some data cleaning (e.g., drop rows with nulls in `Age` or `Fare`).\n",
16
  "\n",
17
- "3. Run basic aggregations:\n",
18
- " - Find the average Fare by Pclass\n",
19
- " - Find survival rate by Sex and Pclass\n",
20
- " - etc.\n",
21
  "\n",
22
- "4. Write the cleaned Spark DataFrame to a Parquet file.\n",
 
23
  "\n",
24
- "5. Bonus tasks:\n",
25
- " - Create a temporary Spark SQL table/view, query it with SQL syntax.\n",
26
- " - Provide quick EDA (e.g., distribution of Ages).\n",
27
- "\n"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28
  ]
29
  }
30
  ],
 
4
  "cell_type": "markdown",
5
  "metadata": {},
6
  "source": [
7
+ "# PySpark Data Engineering Assessment (Extended)\n",
8
  "\n",
9
+ "Welcome! In this notebook, you'll practice:\n",
10
  "\n",
11
+ "1. Reading the **Titanic CSV** in **Pandas** and **PySpark**.\n",
12
+ "2. **Splitting** a single dataset into two DataFrames and **merging** them back together in both Pandas and Spark.\n",
13
+ "3. Data cleaning and aggregations in Pandas and Spark.\n",
14
+ "4. Writing and reading **Parquet** files.\n",
15
+ "5. Creating a **PySpark UDF** that leverages a **lightweight transformer model** to compute embeddings for passenger names.\n",
16
  "\n",
17
+ "---\n",
18
  "\n",
19
+ "## Dataset\n",
 
 
 
20
  "\n",
21
+ "- **`titanic.csv`**: This file is in the `../data/` directory, containing columns such as:\n",
22
+ " - `PassengerId`, `Name`, `Sex`, `Age`, `Fare`, `Survived`, etc.\n",
23
  "\n",
24
+ "We will:\n",
25
+ "1. Read `titanic.csv` into Pandas and Spark.\n",
26
+ "2. Split the original DataFrame into two subsets (simulating two “tables”).\n",
27
+ "3. Demonstrate merges/joins in Pandas and Spark using these subsets.\n",
28
+ "4. Perform data cleaning and transformations.\n",
29
+ "5. Write to Parquet.\n",
30
+ "6. Implement a Spark UDF to generate embeddings for passenger names.\n",
31
+ "\n",
32
+ "---\n",
33
+ "\n",
34
+ "## Instructions\n",
35
+ "\n",
36
+ "Throughout the notebook, you'll see `TODO` sections. Please fill in the required code. Feel free to add extra cells or explanations as needed.\n",
37
+ "\n",
38
+ "When finished, please save or export this notebook and submit according to your instructions.\n",
39
+ "\n",
40
+ "Let's begin!\n"
41
+ ]
42
+ },
43
+ {
44
+ "cell_type": "code",
45
+ "execution_count": null,
46
+ "metadata": {},
47
+ "outputs": [],
48
+ "source": [
49
+ "# 1. Imports and Spark Setup\n",
50
+ "\n",
51
+ "import os\n",
52
+ "import pandas as pd\n",
53
+ "\n",
54
+ "# PySpark imports\n",
55
+ "from pyspark.sql import SparkSession\n",
56
+ "from pyspark.sql import functions as F\n",
57
+ "from pyspark.sql.types import *\n",
58
+ "\n",
59
+ "# Create/initialize Spark session\n",
60
+ "spark = SparkSession.builder \\\n",
61
+ " .appName(\"TitanicAssessmentExtended\") \\\n",
62
+ " .getOrCreate()\n",
63
+ "\n",
64
+ "print(\"Spark version:\", spark.version)\n"
65
+ ]
66
+ },
67
+ {
68
+ "cell_type": "code",
69
+ "execution_count": null,
70
+ "metadata": {},
71
+ "outputs": [],
72
+ "source": [
73
+ "# 2. Read the Titanic CSV (Pandas & Spark)\n",
74
+ "# ========================================\n",
75
+ "\n",
76
+ "# Path to the CSV file\n",
77
+ "titanic_csv_path = os.path.join(\"..\", \"data\", \"titanic.csv\")\n",
78
+ "\n",
79
+ "# 2.1 TODO: Read 'titanic.csv' into a Pandas DataFrame (pd_df)\n",
80
+ "# pd_df = ?\n",
81
+ "\n",
82
+ "# Inspect the shape and first few rows\n",
83
+ "# print(\"pd_df shape:\", pd_df.shape)\n",
84
+ "# display(pd_df.head())\n",
85
+ "\n",
86
+ "# 2.2 TODO: Read 'titanic.csv' into a Spark DataFrame (spark_df)\n",
87
+ "# spark_df = ?\n",
88
+ "\n",
89
+ "# Check schema and row count\n",
90
+ "# spark_df.printSchema()\n",
91
+ "# print(\"spark_df count:\", spark_df.count())\n"
92
+ ]
93
+ },
94
+ {
95
+ "cell_type": "code",
96
+ "execution_count": null,
97
+ "metadata": {},
98
+ "outputs": [],
99
+ "source": [
100
+ "# 3. Split Data into Two Subsets for Merging/Joining\n",
101
+ "# ==================================================\n",
102
+ "# Instead of using a second CSV, we'll simulate it by splitting the original dataset\n",
103
+ "# into two DataFrames:\n",
104
+ "# df_part1: subset of columns -> PassengerId, Name, Sex, Age\n",
105
+ "# df_part2: subset of columns -> PassengerId, Fare, Survived, Pclass\n",
106
+ "#\n",
107
+ "# We then merge these two separate DataFrames in both Pandas and Spark.\n",
108
+ "\n",
109
+ "# 3.1 Pandas Split\n",
110
+ "# ----------------\n",
111
+ "\n",
112
+ "# TODO: Create two new DataFrames from pd_df:\n",
113
+ "# pd_part1 = pd_df[[\"PassengerId\", \"Name\", \"Sex\", \"Age\"]]\n",
114
+ "# pd_part2 = pd_df[[\"PassengerId\", \"Fare\", \"Survived\", \"Pclass\"]]\n",
115
+ "\n",
116
+ "# pd_part1 = ?\n",
117
+ "# pd_part2 = ?\n",
118
+ "\n",
119
+ "# display(pd_part1.head())\n",
120
+ "# display(pd_part2.head())\n"
121
+ ]
122
+ },
123
+ {
124
+ "cell_type": "code",
125
+ "execution_count": null,
126
+ "metadata": {},
127
+ "outputs": [],
128
+ "source": [
129
+ "# 3.2 Spark Split\n",
130
+ "# ---------------\n",
131
+ "# TODO: Create two new DataFrames from spark_df:\n",
132
+ "# spark_part1 = spark_df.select(\"PassengerId\", \"Name\", \"Sex\", \"Age\")\n",
133
+ "# spark_part2 = spark_df.select(\"PassengerId\", \"Fare\", \"Survived\", \"Pclass\")\n",
134
+ "\n",
135
+ "# spark_part1 = ?\n",
136
+ "# spark_part2 = ?\n",
137
+ "\n",
138
+ "# spark_part1.show(5)\n",
139
+ "# spark_part2.show(5)\n"
140
+ ]
141
+ },
142
+ {
143
+ "cell_type": "code",
144
+ "execution_count": null,
145
+ "metadata": {},
146
+ "outputs": [],
147
+ "source": [
148
+ "# 4. Merging / Joining the Split DataFrames\n",
149
+ "# =========================================\n",
150
+ "\n",
151
+ "# 4.1 Merge in Pandas\n",
152
+ "# -------------------\n",
153
+ "# TODO: Merge pd_part1 and pd_part2 on \"PassengerId\"\n",
154
+ "# We'll call the merged DataFrame \"pd_merged\".\n",
155
+ "#\n",
156
+ "# pd_merged = pd_part1.merge(pd_part2, on=\"PassengerId\", how=\"inner\")\n",
157
+ "\n",
158
+ "# pd_merged = ?\n",
159
+ "# print(\"pd_merged shape:\", pd_merged.shape)\n",
160
+ "# display(pd_merged.head())\n"
161
+ ]
162
+ },
163
+ {
164
+ "cell_type": "code",
165
+ "execution_count": null,
166
+ "metadata": {},
167
+ "outputs": [],
168
+ "source": [
169
+ "# 4.2 Join in Spark\n",
170
+ "# -----------------\n",
171
+ "# TODO: Join spark_part1 with spark_part2 on \"PassengerId\"\n",
172
+ "# We'll call the joined DataFrame \"spark_merged\".\n",
173
+ "#\n",
174
+ "# spark_merged = spark_part1.join(spark_part2, on=\"PassengerId\", how=\"inner\")\n",
175
+ "\n",
176
+ "# spark_merged = ?\n",
177
+ "# print(\"spark_merged count:\", spark_merged.count())\n",
178
+ "# spark_merged.show(5)\n",
179
+ "# spark_merged.printSchema()\n"
180
+ ]
181
+ },
182
+ {
183
+ "cell_type": "code",
184
+ "execution_count": null,
185
+ "metadata": {},
186
+ "outputs": [],
187
+ "source": [
188
+ "# 5. Data Cleaning\n",
189
+ "# ================\n",
190
+ "# We'll focus on the merged DataFrames. For instance, drop rows that have missing\n",
191
+ "# values in certain columns like 'Age' or 'Fare'.\n",
192
+ "\n",
193
+ "# 5.1 TODO: Pandas DataFrame cleaning\n",
194
+ "# Create a cleaned version, 'pd_merged_clean',\n",
195
+ "# dropping nulls in [\"Age\", \"Fare\"].\n",
196
+ "\n",
197
+ "# pd_merged_clean = ?\n",
198
+ "\n",
199
+ "# print(\"Before dropna:\", pd_merged.shape)\n",
200
+ "# print(\"After dropna:\", pd_merged_clean.shape)\n",
201
+ "# pd_merged_clean.head()\n"
202
+ ]
203
+ },
204
+ {
205
+ "cell_type": "code",
206
+ "execution_count": null,
207
+ "metadata": {},
208
+ "outputs": [],
209
+ "source": [
210
+ "# 5.2 TODO: Spark DataFrame cleaning\n",
211
+ "# Create a cleaned version, 'spark_merged_clean',\n",
212
+ "# dropping nulls in [\"Age\", \"Fare\"].\n",
213
+ "\n",
214
+ "# spark_merged_clean = ?\n",
215
+ "\n",
216
+ "# print(\"spark_merged count BEFORE dropna:\", spark_merged.count())\n",
217
+ "# print(\"spark_merged_clean count AFTER dropna:\", spark_merged_clean.count())\n",
218
+ "# spark_merged_clean.show(5)\n"
219
+ ]
220
+ },
221
+ {
222
+ "cell_type": "code",
223
+ "execution_count": null,
224
+ "metadata": {},
225
+ "outputs": [],
226
+ "source": [
227
+ "# 6. Basic Aggregations\n",
228
+ "# =====================\n",
229
+ "# Let's do a couple of group-by queries to glean insights.\n",
230
+ "\n",
231
+ "# 6.1 TODO: Pandas - Average fare by Pclass\n",
232
+ "# e.g. group by 'Pclass' and compute mean fare in pd_merged_clean\n",
233
+ "\n",
234
+ "# pd_avg_fare = ?\n",
235
+ "# pd_avg_fare\n"
236
+ ]
237
+ },
238
+ {
239
+ "cell_type": "code",
240
+ "execution_count": null,
241
+ "metadata": {},
242
+ "outputs": [],
243
+ "source": [
244
+ "# 6.2 TODO: Spark - Survival rate by Sex and Pclass\n",
245
+ "# e.g. groupBy(\"Sex\", \"Pclass\").agg(F.avg(\"Survived\"))\n",
246
+ "#\n",
247
+ "# spark_survival_rate = ?\n",
248
+ "# spark_survival_rate.show()\n"
249
+ ]
250
+ },
251
+ {
252
+ "cell_type": "code",
253
+ "execution_count": null,
254
+ "metadata": {},
255
+ "outputs": [],
256
+ "source": [
257
+ "# 7. Writing to Parquet\n",
258
+ "# =====================\n",
259
+ "# We'll write the cleaned Spark DataFrame to a Parquet file (e.g. \"titanic_merged_clean.parquet\").\n",
260
+ "\n",
261
+ "# 7.1 TODO: Write spark_merged_clean to Parquet\n",
262
+ "# e.g., spark_merged_clean.write.mode(\"overwrite\").parquet(\"titanic_merged_clean.parquet\")\n",
263
+ "\n",
264
+ "# 7.2 TODO: Read it back into a new Spark DataFrame called 'spark_parquet_df'\n",
265
+ "# spark_parquet_df = ?\n",
266
+ "\n",
267
+ "# print(\"spark_parquet_df count:\", spark_parquet_df.count())\n",
268
+ "# spark_parquet_df.show(5)\n"
269
+ ]
270
+ },
271
+ {
272
+ "cell_type": "code",
273
+ "execution_count": null,
274
+ "metadata": {},
275
+ "outputs": [],
276
+ "source": [
277
+ "# 8. Bonus 1: Create a Temp View and Query\n",
278
+ "# ========================================\n",
279
+ "# 8.1 TODO: Create a temp view with 'spark_merged_clean' (e.g. \"titanic_merged\")\n",
280
+ "# spark_merged_clean.createOrReplaceTempView(\"titanic_merged\")\n",
281
+ "\n",
282
+ "# 8.2 TODO: Spark SQL query example\n",
283
+ "# result_df = spark.sql(\"SELECT ... FROM titanic_merged GROUP BY ...\")\n",
284
+ "# result_df.show()\n"
285
+ ]
286
+ },
287
+ {
288
+ "cell_type": "code",
289
+ "execution_count": null,
290
+ "metadata": {},
291
+ "outputs": [],
292
+ "source": [
293
+ "# 9. Bonus 2: Transformer Embeddings UDF\n",
294
+ "# ======================================\n",
295
+ "# We'll demonstrate a simple approach using a lightweight transformer model to embed passenger names.\n",
296
+ "# This is optional, but shows advanced usage of Spark UDFs.\n",
297
+ "\n",
298
+ "# Requirements: e.g. \"transformers\" or \"sentence-transformers\" in your environment.\n",
299
+ "# from transformers import pipeline\n",
300
+ "# embedding_pipeline = pipeline(\"feature-extraction\", model=\"distilbert-base-uncased\")\n",
301
+ "# OR\n",
302
+ "# from sentence_transformers import SentenceTransformer\n",
303
+ "# model = SentenceTransformer(\"all-MiniLM-L6-v2\")\n",
304
+ "\n",
305
+ "# 9.1 TODO: import / load the model/pipeline\n",
306
+ "# e.g.\n",
307
+ "# from transformers import pipeline\n",
308
+ "# embedding_pipeline = pipeline(\"feature-extraction\", model=\"distilbert-base-uncased\")\n",
309
+ "\n",
310
+ "# 9.2 Define a Python function that takes a passenger name (string) -> returns a list of floats\n",
311
+ "\n",
312
+ "# def get_name_embedding(name: str) -> List[float]:\n",
313
+ "# # TODO: use embedding_pipeline or model to produce an embedding\n",
314
+ "# # embedding = ?\n",
315
+ "# # NOTE: verify shape (embedding might be list of lists)\n",
316
+ "# return ???\n",
317
+ "\n",
318
+ "# 9.3 Wrap that function in a PySpark UDF\n",
319
+ "# from pyspark.sql.functions import udf\n",
320
+ "# from pyspark.sql.types import ArrayType, FloatType\n",
321
+ "# udf_get_name_embedding = udf(get_name_embedding, ArrayType(FloatType()))\n",
322
+ "\n",
323
+ "# 9.4 Apply the UDF to create a new column 'NameEmbedding' in spark_merged_clean\n",
324
+ "# spark_embedded = spark_merged_clean.withColumn(\"NameEmbedding\", udf_get_name_embedding(F.col(\"Name\")))\n",
325
+ "\n",
326
+ "# spark_embedded.select(\"Name\", \"NameEmbedding\").show(truncate=False)\n"
327
  ]
328
  }
329
  ],
notebooks/solutions.ipynb ADDED
@@ -0,0 +1,308 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ {
2
+ "cells": [
3
+ {
4
+ "cell_type": "markdown",
5
+ "metadata": {},
6
+ "source": [
7
+ "## Solutions Guide"
8
+ ]
9
+ },
10
+ {
11
+ "cell_type": "code",
12
+ "execution_count": null,
13
+ "metadata": {},
14
+ "outputs": [],
15
+ "source": [
16
+ "import os\n",
17
+ "import pandas as pd\n",
18
+ "\n",
19
+ "# PySpark imports\n",
20
+ "from pyspark.sql import SparkSession\n",
21
+ "from pyspark.sql import functions as F\n",
22
+ "from pyspark.sql.types import *\n",
23
+ "\n",
24
+ "# Create or get Spark session\n",
25
+ "spark = SparkSession.builder \\\n",
26
+ " .appName(\"TitanicAssessmentExtended\") \\\n",
27
+ " .getOrCreate()\n",
28
+ "\n",
29
+ "print(\"Spark version:\", spark.version)\n"
30
+ ]
31
+ },
32
+ {
33
+ "cell_type": "markdown",
34
+ "metadata": {},
35
+ "source": [
36
+ "Explanation:\n",
37
+ "\n",
38
+ " We import pandas, pyspark.sql modules, and create a Spark session named \"TitanicAssessmentExtended\".\n",
39
+ " Checking spark.version helps confirm which version of Spark is running."
40
+ ]
41
+ },
42
+ {
43
+ "cell_type": "code",
44
+ "execution_count": null,
45
+ "metadata": {},
46
+ "outputs": [],
47
+ "source": [
48
+ "#Read in data \n",
49
+ "titanic_csv_path = os.path.join(\"..\", \"data\", \"titanic.csv\")\n",
50
+ "\n",
51
+ "# 2.1 Read into a Pandas DataFrame\n",
52
+ "pd_df = pd.read_csv(titanic_csv_path)\n",
53
+ "\n",
54
+ "print(\"pd_df shape:\", pd_df.shape)\n",
55
+ "display(pd_df.head())\n"
56
+ ]
57
+ },
58
+ {
59
+ "cell_type": "markdown",
60
+ "metadata": {},
61
+ "source": [
62
+ "We use pd.read_csv(...) to read the Titanic data into a pd.DataFrame.\n",
63
+ ".shape gives the (rows, columns).\n",
64
+ ".head() shows the top few rows."
65
+ ]
66
+ },
67
+ {
68
+ "cell_type": "code",
69
+ "execution_count": null,
70
+ "metadata": {},
71
+ "outputs": [],
72
+ "source": [
73
+ "# 2.2 Read into a Spark DataFrame\n",
74
+ "spark_df = spark.read.csv(titanic_csv_path, header=True, inferSchema=True)\n",
75
+ "\n",
76
+ "spark_df.printSchema()\n",
77
+ "print(\"spark_df count:\", spark_df.count())\n",
78
+ "spark_df.show(5)\n"
79
+ ]
80
+ },
81
+ {
82
+ "cell_type": "markdown",
83
+ "metadata": {},
84
+ "source": [
85
+ "We specify header=True so Spark knows the first row is column headers, and inferSchema=True so it automatically detects column types.\n",
86
+ ".printSchema() reveals the inferred schema.\n",
87
+ ".count() and .show() let us see row counts and sample rows."
88
+ ]
89
+ },
90
+ {
91
+ "cell_type": "code",
92
+ "execution_count": null,
93
+ "metadata": {},
94
+ "outputs": [],
95
+ "source": [
96
+ "#Split data into subsets\n",
97
+ "\n",
98
+ "pd_part1 = pd_df[[\"PassengerId\", \"Name\", \"Sex\", \"Age\"]]\n",
99
+ "pd_part2 = pd_df[[\"PassengerId\", \"Fare\", \"Survived\", \"Pclass\"]]\n",
100
+ "\n",
101
+ "display(pd_part1.head())\n",
102
+ "display(pd_part2.head())\n"
103
+ ]
104
+ },
105
+ {
106
+ "cell_type": "code",
107
+ "execution_count": null,
108
+ "metadata": {},
109
+ "outputs": [],
110
+ "source": [
111
+ "spark_part1 = spark_df.select(\"PassengerId\", \"Name\", \"Sex\", \"Age\")\n",
112
+ "spark_part2 = spark_df.select(\"PassengerId\", \"Fare\", \"Survived\", \"Pclass\")\n",
113
+ "\n",
114
+ "spark_part1.show(5)\n",
115
+ "spark_part2.show(5)\n"
116
+ ]
117
+ },
118
+ {
119
+ "cell_type": "code",
120
+ "execution_count": null,
121
+ "metadata": {},
122
+ "outputs": [],
123
+ "source": [
124
+ "#Merging/Joining split dataframes \n",
125
+ "\n",
126
+ "pd_merged = pd_part1.merge(pd_part2, on=\"PassengerId\", how=\"inner\")\n",
127
+ "print(\"pd_merged shape:\", pd_merged.shape)\n",
128
+ "display(pd_merged.head())\n"
129
+ ]
130
+ },
131
+ {
132
+ "cell_type": "markdown",
133
+ "metadata": {},
134
+ "source": [
135
+ "on=\"PassengerId\" merges the two tables by the PassengerId key.\n",
136
+ "how=\"inner\" ensures rows only appear if they exist in both subsets (should be all matching in this case)."
137
+ ]
138
+ },
139
+ {
140
+ "cell_type": "code",
141
+ "execution_count": null,
142
+ "metadata": {},
143
+ "outputs": [],
144
+ "source": [
145
+ "#Join in spark\n",
146
+ "\n",
147
+ "spark_merged = spark_part1.join(spark_part2, on=\"PassengerId\", how=\"inner\")\n",
148
+ "print(\"spark_merged count:\", spark_merged.count())\n",
149
+ "spark_merged.show(5)\n",
150
+ "spark_merged.printSchema()\n"
151
+ ]
152
+ },
153
+ {
154
+ "cell_type": "markdown",
155
+ "metadata": {},
156
+ "source": [
157
+ "Spark uses .join(df2, on=\"PassengerId\", how=\"inner\").\n",
158
+ "spark_merged.show(5) and .printSchema() confirm the merge result."
159
+ ]
160
+ },
161
+ {
162
+ "cell_type": "code",
163
+ "execution_count": null,
164
+ "metadata": {},
165
+ "outputs": [],
166
+ "source": [
167
+ "#Data cleaning\n",
168
+ "\n",
169
+ "pd_merged_clean = pd_merged.dropna(subset=[\"Age\", \"Fare\"])\n",
170
+ "print(\"Before dropna:\", pd_merged.shape)\n",
171
+ "print(\"After dropna:\", pd_merged_clean.shape)\n",
172
+ "pd_merged_clean.head()"
173
+ ]
174
+ },
175
+ {
176
+ "cell_type": "code",
177
+ "execution_count": null,
178
+ "metadata": {},
179
+ "outputs": [],
180
+ "source": [
181
+ "#Spark data cleaning\n",
182
+ "spark_merged_clean = spark_merged.dropna(subset=[\"Age\", \"Fare\"])\n",
183
+ "print(\"spark_merged count BEFORE dropna:\", spark_merged.count())\n",
184
+ "print(\"spark_merged_clean count AFTER dropna:\", spark_merged_clean.count())\n",
185
+ "spark_merged_clean.show(5)\n"
186
+ ]
187
+ },
188
+ {
189
+ "cell_type": "code",
190
+ "execution_count": null,
191
+ "metadata": {},
192
+ "outputs": [],
193
+ "source": [
194
+ "#Basic aggregations\n",
195
+ "\n",
196
+ "pd_avg_fare = pd_merged_clean.groupby(\"Pclass\")[\"Fare\"].mean()\n",
197
+ "pd_avg_fare"
198
+ ]
199
+ },
200
+ {
201
+ "cell_type": "code",
202
+ "execution_count": null,
203
+ "metadata": {},
204
+ "outputs": [],
205
+ "source": [
206
+ "#Spark survival rate by sex and pclass\n",
207
+ "\n",
208
+ "spark_survival_rate = (\n",
209
+ " spark_merged_clean\n",
210
+ " .groupBy(\"Sex\", \"Pclass\")\n",
211
+ " .agg(F.avg(\"Survived\").alias(\"survival_rate\"))\n",
212
+ ")\n",
213
+ "spark_survival_rate.show()\n"
214
+ ]
215
+ },
216
+ {
217
+ "cell_type": "code",
218
+ "execution_count": null,
219
+ "metadata": {},
220
+ "outputs": [],
221
+ "source": [
222
+ "#Write spark df to parquet\n",
223
+ "\n",
224
+ "spark_merged_clean.write.mode(\"overwrite\").parquet(\"titanic_merged_clean.parquet\")"
225
+ ]
226
+ },
227
+ {
228
+ "cell_type": "code",
229
+ "execution_count": null,
230
+ "metadata": {},
231
+ "outputs": [],
232
+ "source": [
233
+ "#Read parquet back in\n",
234
+ "\n",
235
+ "spark_parquet_df = spark.read.parquet(\"titanic_merged_clean.parquet\")\n",
236
+ "print(\"spark_parquet_df count:\", spark_parquet_df.count())\n",
237
+ "spark_parquet_df.show(5)\n"
238
+ ]
239
+ },
240
+ {
241
+ "cell_type": "code",
242
+ "execution_count": null,
243
+ "metadata": {},
244
+ "outputs": [],
245
+ "source": [
246
+ "#Bonus - create a temp view/query\n",
247
+ "\n",
248
+ "spark_merged_clean.createOrReplaceTempView(\"titanic_merged\")\n",
249
+ "\n",
250
+ "result_df = spark.sql(\n",
251
+ " \"\"\"\n",
252
+ " SELECT Pclass,\n",
253
+ " COUNT(*) AS passenger_count,\n",
254
+ " AVG(Age) AS avg_age\n",
255
+ " FROM titanic_merged\n",
256
+ " GROUP BY Pclass\n",
257
+ " ORDER BY Pclass\n",
258
+ " \"\"\")\n",
259
+ "result_df.show()\n"
260
+ ]
261
+ },
262
+ {
263
+ "cell_type": "code",
264
+ "execution_count": null,
265
+ "metadata": {},
266
+ "outputs": [],
267
+ "source": [
268
+ "# Example imports (make sure 'transformers' is installed)\n",
269
+ "from transformers import pipeline\n",
270
+ "embedding_pipeline = pipeline(\"feature-extraction\", model=\"distilbert-base-uncased\")\n",
271
+ "\n",
272
+ "# Example function to get the name embedding\n",
273
+ "def get_name_embedding(name: str):\n",
274
+ " # The pipeline will return a list of lists of floats.\n",
275
+ " # Typically shape: (1, sequence_length, hidden_size).\n",
276
+ " # We'll take the first token or perhaps average them.\n",
277
+ " output = embedding_pipeline(name)\n",
278
+ " # output[0] is shape [sequence_length, hidden_size]\n",
279
+ " # let's do a simple average across the sequence dimension:\n",
280
+ " token_embeddings = output[0]\n",
281
+ " # average across tokens:\n",
282
+ " mean_embedding = [float(sum(x) / len(x)) for x in zip(*token_embeddings)]\n",
283
+ " return mean_embedding\n",
284
+ "\n",
285
+ "# Convert this Python function to a Spark UDF\n",
286
+ "from pyspark.sql.functions import udf\n",
287
+ "from pyspark.sql.types import ArrayType, FloatType\n",
288
+ "\n",
289
+ "udf_get_name_embedding = udf(get_name_embedding, ArrayType(FloatType()))\n",
290
+ "\n",
291
+ "# Apply it to add a new column\n",
292
+ "spark_embedded = spark_merged_clean.withColumn(\n",
293
+ " \"NameEmbedding\",\n",
294
+ " udf_get_name_embedding(F.col(\"Name\"))\n",
295
+ ")\n",
296
+ "\n",
297
+ "spark_embedded.select(\"Name\", \"NameEmbedding\").show(truncate=False)\n"
298
+ ]
299
+ }
300
+ ],
301
+ "metadata": {
302
+ "language_info": {
303
+ "name": "python"
304
+ }
305
+ },
306
+ "nbformat": 4,
307
+ "nbformat_minor": 2
308
+ }