forked from SindhuKRao/G07-final-project
-
Notifications
You must be signed in to change notification settings - Fork 0
/
01 etl.py
766 lines (577 loc) · 24.4 KB
/
01 etl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
# Databricks notebook source
# MAGIC %run ./includes/includes
# COMMAND ----------
start_date = str(dbutils.widgets.get('01.start_date'))
end_date = str(dbutils.widgets.get('02.end_date'))
hours_to_forecast = int(dbutils.widgets.get('03.hours_to_forecast'))
promote_model = bool(True if str(dbutils.widgets.get('04.promote_model')).lower() == 'yes' else False)
print(start_date,end_date,hours_to_forecast, promote_model)
print("YOUR CODE HERE...")
# COMMAND ----------
# MAGIC %md
# MAGIC #### Importing All the required Libraries
# COMMAND ----------
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType, DoubleType
from pyspark.sql.functions import *
from pyspark.sql.types import TimestampType
# COMMAND ----------
# MAGIC %md
# MAGIC #### Building schema for the columns of Bike Trips
# COMMAND ----------
bike_schema = StructType([
StructField("ride_id", StringType(), True),
StructField("rideable_type", StringType(), True),
StructField("started_at", StringType(), True),
StructField("ended_at", StringType(), True),
StructField("start_station_name", StringType(), True),
StructField("start_station_id", StringType(), True),
StructField("end_station_name", StringType(), True),
StructField("end_station_id", StringType(), True),
StructField("start_lat", StringType(), True),
StructField("start_lng", StringType(), True),
StructField("end_lat", StringType(), True),
StructField("end_lng", StringType(), True),
StructField("member_casual", StringType(), True)])
# COMMAND ----------
# MAGIC %md
# MAGIC #### Setting the input/output paths for the history bike trips data
# COMMAND ----------
input_path = "dbfs:/FileStore/tables/raw/bike_trips/"
output_path = "dbfs:/FileStore/tables/G07/bronze/bike_trips_history"
# COMMAND ----------
# MAGIC %md
# MAGIC #### Loading the history bike trip data and saving in a delta format
# COMMAND ----------
query = (
spark
.readStream
.format("csv")
.schema(bike_schema)
.option("header", "true")
.load(input_path)
.writeStream
.format("delta")
.option("path", output_path)
.option("checkpointLocation", output_path + "/checkpoint")
.trigger(availableNow=True)
.start()
)
query.awaitTermination()
# COMMAND ----------
# MAGIC %md
# MAGIC #### Building schema for the columns of weather data
# COMMAND ----------
weather_schema = StructType([
StructField("dt", IntegerType(), True),
StructField("temp", DoubleType(), True),
StructField("feels_like", DoubleType(), True),
StructField("pressure", IntegerType(), True),
StructField("humidity", IntegerType(), True),
StructField("dew_point", DoubleType(), True),
StructField("uvi", DoubleType(), True),
StructField("clouds", IntegerType(), True),
StructField("visibility", IntegerType(), True),
StructField("wind_speed", DoubleType(), True),
StructField("wind_deg", IntegerType(), True),
StructField("pop", DoubleType(), True),
StructField("snow_1h", DoubleType(), True),
StructField("id", IntegerType(), True),
StructField("main", StringType(), True),
StructField("description", StringType(), True),
StructField("icon", StringType(), True),
StructField("loc", StringType(), True),
StructField("lat", DoubleType(), True),
StructField("lon", DoubleType(), True),
StructField("timezone", StringType(), True),
StructField("timezone_offset", IntegerType(), True)
])
# COMMAND ----------
# MAGIC %md
# MAGIC #### Setting the input/output paths for the weather data
# COMMAND ----------
input_path2 = "dbfs:/FileStore/tables/raw/weather/"
output_path2 = "dbfs:/FileStore/tables/G07/bronze/weather_history"
# COMMAND ----------
# MAGIC %md
# MAGIC #### Loading the weather data and saving in a delta format
# COMMAND ----------
query = (
spark
.readStream
.format("csv")
.schema(weather_schema)
.option("header", "true")
.load(input_path2)
.writeStream
.format("delta")
.option("path", output_path2)
.option("checkpointLocation", output_path2 + "/checkpoint")
.trigger(availableNow=True)
.start()
)
query.awaitTermination()
# COMMAND ----------
# MAGIC %md
# MAGIC #### Updating Shuffle Paritions
# COMMAND ----------
spark.conf.set("spark.sql.shuffle.partitions", spark.sparkContext.defaultParallelism)
print(spark.conf.get("spark.sql.shuffle.partitions"))
# COMMAND ----------
# MAGIC %md
# MAGIC #### Listing the files present in our directory to check how it looks
# COMMAND ----------
files=dbutils.fs.ls("dbfs:/FileStore/tables/G07")
for file in files:
print(file.name)
# COMMAND ----------
# MAGIC %md
# MAGIC #### Reading bike trips data from the path where we stored the bronze files and creating a SQL Temp for reading the data
# COMMAND ----------
delta_path = "dbfs:/FileStore/tables/G07/bronze/bike_trips_history"
spark.read.format("delta").load(delta_path).createOrReplaceTempView("bike_trip_history_delta")
# COMMAND ----------
# MAGIC %md
# MAGIC #### Analyzing the minimum and maximum available dates in our data which is from "2021-11-01 00:00:01" to "2023-03-31 23:59:57". Also confirming the count. There is 40399055 unique ride id count. This mataches with 40399055 total rides count. Thus ride_id is a unique identifier.
# COMMAND ----------
# MAGIC %sql
# MAGIC SELECT min(started_at) as started_at,max(started_at) as ended_date, count(*) as totaldata
# MAGIC FROM bike_trip_history_delta
# COMMAND ----------
# MAGIC %md
# MAGIC #### Creating a temp in order to have a date column with hours data parsed only. This column will be used later for grouping bike trips data and get counts. We are pulling only our station data where our station is either the start or end.
# COMMAND ----------
spark.sql(
"""
select
date_format(date_trunc('hour',started_at),'yyyy-MM-dd HH:mm:ss') as startdate,
date_format(date_trunc('hour',ended_at),'yyyy-MM-dd HH:mm:ss') as enddate,
*
from bike_trip_history_delta
where start_station_name = "Broadway & W 25 St"
OR end_station_name = "Broadway & W 25 St"
"""
).createOrReplaceTempView("bike_trip_history_delta_2")
# COMMAND ----------
# MAGIC %sql
# MAGIC SELECT min(started_at) as started_at,max(started_at) as ended_date, count(*) as totaldata
# MAGIC FROM bike_trip_history_delta_2
# COMMAND ----------
# MAGIC %md
# MAGIC #### Here we are calculating the number of bike trips taking place in an hour. We are creating calculated columns which holds values such as number of bikes left our station at a given hour and came back to our station at a given hour. We are also takig the net difference of the bikes. This net ifference is what we are going to predict in our forecast model.
# COMMAND ----------
bike_trips_df = spark.sql(
"""
with cte1 as(
select startdate, sum(case when start_station_name="Broadway & W 25 St" then 1 else 0 end) tripstarted_at_our_station
from bike_trip_history_delta_2
group by startdate)
,cte2 AS (
select enddate, sum(case when end_station_name="Broadway & W 25 St" then 1 else 0 end) tripend_at_our_station
from bike_trip_history_delta_2
group by enddate)
select startdate, tripstarted_at_our_station bike_leaving_our_station, tripend_at_our_station bike_starting_our_station,
(tripend_at_our_station-tripstarted_at_our_station) as net_trip_difference
from cte1 as tab1
inner join cte2 as tab2
on tab1.startdate=tab2.enddate
order by tab1.startdate
"""
)
# COMMAND ----------
bike_trips_df.createOrReplaceTempView("bike_trips_count_delta")
# COMMAND ----------
# MAGIC %md
# MAGIC #### Here we are analyzing for different dates whether our code written above is working fine
# COMMAND ----------
# MAGIC %sql
# MAGIC select * from
# MAGIC bike_trip_history_delta_2
# MAGIC where date(started_at) = "2021-11-17"
# MAGIC and hour(started_at) = 8
# MAGIC and start_station_name="Broadway & W 25 St"
# MAGIC order by started_at
# COMMAND ----------
# MAGIC %sql
# MAGIC select * from bike_trip_history_delta_2
# MAGIC order by startdate
# COMMAND ----------
# MAGIC %md
# MAGIC #### We are creating a silver table from the RAW Bronze data of bike trips. This silver biketrip count table will hold the number of trips taking place at a given hour.
# COMMAND ----------
delta_table_name = 'biketrips_count_g07'
bike_trips_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").option("path", GROUP_DATA_PATH + "silver"+ delta_table_name).saveAsTable(delta_table_name)
# COMMAND ----------
# MAGIC %md
# MAGIC #### Reading bike trips data from the path where we stored the bronze files and creating a SQL Temp for reading the data
# COMMAND ----------
delta_path = "dbfs:/FileStore/tables/G07/bronze/weather_history"
spark.read.format("delta").load(delta_path).createOrReplaceTempView("weather_history_delta")
# COMMAND ----------
# MAGIC %md
# MAGIC #### Below Query checks for duplicates in weather data, and other weather data related analysis
# COMMAND ----------
# MAGIC %sql
# MAGIC -- SELECT from_unixtime(dt) as abc,count(*)
# MAGIC -- FROM weather_history_delta
# MAGIC -- GROUP BY from_unixtime(dt)
# MAGIC -- HAVING count(*)>1
# MAGIC
# MAGIC --below weather dates has duplicates in the data
# MAGIC -- 2022-10-30 16:00:00
# MAGIC -- 2022-10-30 17:00:00
# MAGIC -- 2022-10-30 19:00:00
# MAGIC -- 2022-10-30 18:00:00
# MAGIC
# MAGIC SELECT from_unixtime(dt) as dt,*
# MAGIC FROM weather_history_delta
# MAGIC where DATE(from_unixtime(dt)) = '2022-06-04'
# MAGIC order by dt
# COMMAND ----------
# MAGIC %md
# MAGIC #### Creating Duplicate Free, Clean Weather Table. This step also creates a SQL temp table "clean_weather_history_delta"
# COMMAND ----------
clean_weather_df = spark.sql(
"""
with cte as(
SELECT from_unixtime(dt) as parsed_dt,*,
row_number() over (PARTITION BY from_unixtime(dt) ORDER BY id desc) as rnum
FROM weather_history_delta
--where from_unixtime(dt) = '2022-10-30 19:00:00'
)
select parsed_dt, temp, feels_like, pressure, humidity, dew_point, uvi, clouds, visibility, wind_speed, wind_deg, pop, snow_1h, id, main, description, icon, loc, lat, lon, timezone, timezone_offset
from cte
where rnum=1
"""
)
clean_weather_df.createOrReplaceTempView("clean_weather_history_delta")
# COMMAND ----------
# MAGIC %md
# MAGIC #### Following few code chunks analyzes the weather data for making decisions on how to precprocess it
# COMMAND ----------
# MAGIC %sql
# MAGIC SELECT min(parsed_dt) as started_at,max(parsed_dt) as ended_date, count(*)
# MAGIC FROM clean_weather_history_delta
# COMMAND ----------
# MAGIC %sql
# MAGIC --- Checking lon, lat for weather station
# MAGIC SELECT DISTINCT lat, lon, timezone, timezone_offset, loc
# MAGIC FROM clean_weather_history_delta
# COMMAND ----------
# MAGIC %sql
# MAGIC --checking if all weather data is available or we are getting any NULLS
# MAGIC SELECT min(tab1.started_at), max(tab1.started_at)
# MAGIC FROM bike_trip_history_delta as tab1
# MAGIC LEFT JOIN clean_weather_history_delta as tab2
# MAGIC ON DATE(tab1.started_at) = DATE(parsed_dt)
# MAGIC AND HOUR(tab1.started_at) = HOUR(parsed_dt)
# MAGIC WHERE tab2.parsed_dt is NULL
# COMMAND ----------
# MAGIC %sql
# MAGIC --checking if all weather data is available or we are getting any NULLS
# MAGIC SELECT distinct(DATE(tab1.started_at))
# MAGIC FROM bike_trip_history_delta as tab1
# MAGIC LEFT JOIN clean_weather_history_delta as tab2
# MAGIC ON DATE(tab1.started_at) = DATE(parsed_dt)
# MAGIC AND HOUR(tab1.started_at) = HOUR(parsed_dt)
# MAGIC WHERE start_station_name = 'Broadway & W 25 St'
# MAGIC AND tab2.parsed_dt is null
# MAGIC AND DATE(tab1.started_at) >= '2021-11-20'
# MAGIC ORDER BY DATE(started_at)
# COMMAND ----------
# MAGIC %md
# MAGIC #### The following code pulls the data of weathers from OpeweatherAPI. The idea is to pull the missing weather data from the API which might be useful in building the forecasting model
# COMMAND ----------
import requests, json
BASE_URL = "https://history.openweathermap.org/data/2.5/history/city?"
lat = "40.712"
lon = "-74.006"
URL = BASE_URL + "lat=" + lat + "&lon=" + lon + "&type=hour&start=1667260800&end=1667865600" + "&appid=" + "10db4449c9624126b288cedc8a5cca2d"
response = requests.get(URL).json()
URL = BASE_URL + "lat=" + lat + "&lon=" + lon + "&type=hour&start=1654362000&end=1654376400" + "&appid=" + "10db4449c9624126b288cedc8a5cca2d"
response2 = requests.get(URL).json()
# COMMAND ----------
# MAGIC %md
# MAGIC #### The following code is for Parsing the json weather request received from the API
# COMMAND ----------
loc="NYC"
lat=40.712
lon=-74.006
timezone="America/New_York"
timezone_offset=-14400
list_weather = list()
for i in range(len(response['list'])):
dt=0
temp=0
feels_like=0
pressure=0
humidity=0
dew_point=0
uvi=0
clouds=0
visibility=0
wind_speed=0
wind_deg=0
pop=0
snow_1h=0
id=0
main=0
description=0
icon=0
dt = response['list'][i]['dt']
temp = response['list'][i]['main']['temp']+0.0
feels_like = response['list'][i]['main']['feels_like']
pressure = response['list'][i]['main']['pressure']
humidity = response['list'][i]['main']['humidity']
dew_point=0.0
uvi=0.0
clouds = response['list'][i]['clouds']['all']
wind_speed = response['list'][i]['wind']['speed']+0.0
wind_deg = response['list'][i]['wind']['deg']
pop=0.0
snow_1h=0.0
id = response['list'][i]['weather'][0]['id']
main = response['list'][i]['weather'][0]['main']
description = response['list'][i]['weather'][0]['description']
icon = response['list'][i]['weather'][0]['icon']
visibility=0
list_weather.append([dt,temp,feels_like,pressure,humidity,dew_point,uvi,clouds,visibility,wind_speed,wind_deg,pop,snow_1h,id,main,description,icon,
loc,lat,lon,timezone,timezone_offset])
for i in range(len(response2['list'])):
dt=0
temp=0
feels_like=0
pressure=0
humidity=0
dew_point=0
uvi=0
clouds=0
visibility=0
wind_speed=0
wind_deg=0
pop=0
snow_1h=0
id=0
main=0
description=0
icon=0
dt = response2['list'][i]['dt']
temp = response2['list'][i]['main']['temp']+0.0
feels_like = response2['list'][i]['main']['feels_like']+0.0
pressure = response2['list'][i]['main']['pressure']
humidity = response2['list'][i]['main']['humidity']
dew_point=0.0
uvi=0.0
clouds = response2['list'][i]['clouds']['all']
wind_speed = response2['list'][i]['wind']['speed']+0.0
wind_deg = response2['list'][i]['wind']['deg']
pop=0.0
snow_1h=0.0
id = response2['list'][i]['weather'][0]['id']
main = response2['list'][i]['weather'][0]['main']
description = response2['list'][i]['weather'][0]['description']
icon = response2['list'][i]['weather'][0]['icon']
visibility=0
list_weather.append([dt,temp,feels_like,pressure,humidity,dew_point,uvi,clouds,visibility,wind_speed,wind_deg,pop,snow_1h,id,main,description,icon,
loc,lat,lon,timezone,timezone_offset])
# COMMAND ----------
# MAGIC %md
# MAGIC #### Creating a union fo newly pulled weather data and adding it to the history table.
# COMMAND ----------
weather_api_df = spark.createDataFrame(data=list_weather, schema = weather_schema)
display(weather_api_df)
# COMMAND ----------
# MAGIC %md
# MAGIC #### Creating Temp view for the union data created in the previous step. Also testing all distinct data pulled from the wether api to make sure we are pulling required information
# COMMAND ----------
weather_api_df.createOrReplaceTempView("weather_api_df_delta")
# COMMAND ----------
# MAGIC %sql
# MAGIC select distinct(from_unixtime(dt)) as dt
# MAGIC FROM weather_api_df_delta
# MAGIC order by dt
# COMMAND ----------
# MAGIC %md
# MAGIC #### Here we are converting the weather date to a proper datetime format to be used further. In the following code chunk we are creating the union of history weather data and api pulled weather data
# COMMAND ----------
weather_api_df_stg1 = spark.sql(
"""
with cte as(
SELECT from_unixtime(dt) as parsed_dt,*
FROM weather_api_df_delta
)
select parsed_dt, temp, feels_like, pressure, humidity, dew_point, uvi, clouds, visibility, wind_speed, wind_deg, pop, snow_1h, id, main, description, icon, loc, lat, lon, timezone, timezone_offset
from cte
"""
)
# COMMAND ----------
weather_final_df = clean_weather_df.union(weather_api_df_stg1)
weather_final_df.createOrReplaceTempView("weather_final_df_delta")
# COMMAND ----------
# MAGIC %md
# MAGIC #### Removing any duplicate timestamps from weather data. After that making a SQL Temp table for using further. Then we are checking again if still duplicates persists.
# COMMAND ----------
clean_weather_final_df_delta = spark.sql(
"""
with cte as(
SELECT *,
row_number() over (PARTITION BY parsed_dt ORDER BY id desc) as rnum
FROM weather_final_df_delta
)
select parsed_dt, temp, feels_like, pressure, humidity, dew_point, uvi, clouds, visibility, wind_speed, wind_deg, pop, snow_1h, id, main, description, icon, loc, lat, lon, timezone, timezone_offset
from cte
where rnum=1
"""
)
clean_weather_final_df_delta.createOrReplaceTempView("clean_weather_final_df_delta_delta")
# COMMAND ----------
# MAGIC %sql
# MAGIC SELECT parsed_dt as abc,count(*)
# MAGIC FROM clean_weather_final_df_delta_delta
# MAGIC GROUP BY parsed_dt
# MAGIC HAVING count(*)>1
# COMMAND ----------
# MAGIC %md
# MAGIC ### Creating a merged biketrip count data with weather data created above. After that we are saving that data in the filepath as silverbike_weather_g07. We are also creating a separate file for silverweather_imputed_g07. This data is all weather data including history and api pulled weather data. The first two code chunks analyzes how the merge will work and if we are getting null weather in expected dates i.e. 2021-01-01 to 2021-11-19
# COMMAND ----------
# MAGIC %sql
# MAGIC SELECT *
# MAGIC FROM bike_trips_count_delta as tab1
# MAGIC LEFT JOIN clean_weather_final_df_delta_delta as tab2
# MAGIC ON tab1.startdate = tab2.parsed_dt
# MAGIC order by startdate
# COMMAND ----------
# MAGIC %sql
# MAGIC SELECT min(tab1.startdate), max(tab1.startdate)
# MAGIC FROM bike_trips_count_delta as tab1
# MAGIC LEFT JOIN clean_weather_final_df_delta_delta as tab2
# MAGIC ON tab1.startdate = tab2.parsed_dt
# MAGIC WHERE tab2.parsed_dt is NULL
# COMMAND ----------
bike_weather_df=spark.sql("""SELECT *
FROM bike_trips_count_delta as tab1
LEFT JOIN clean_weather_final_df_delta_delta as tab2
ON tab1.startdate = tab2.parsed_dt
"""
)
# COMMAND ----------
# MAGIC %md
# MAGIC #### Adding Z-Ordering on the column on which we are joining data with other tables and writing as a delta file
# COMMAND ----------
##creating a silver table which is merged of weather and bike trip
delta_table_name = 'bike_weather_g07'
bike_weather_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").option("path", GROUP_DATA_PATH + "silver"+ delta_table_name).option("zOrderByCol", "startdate").saveAsTable(delta_table_name)
# COMMAND ----------
##creating a silver table only for weather with api imputed data to use during EDA
delta_table_name = 'weather_imputed_g07'
clean_weather_final_df_delta.write.format("delta").mode("overwrite").option("overwriteSchema", "true").option("path", GROUP_DATA_PATH + "silver"+ delta_table_name).option("zOrderByCol", "parsed_dt").saveAsTable(delta_table_name)
# COMMAND ----------
# MAGIC %md
# MAGIC #### Getting the Realtime data from the three other Bronze Tables and preparing corresponding silver tables
# COMMAND ----------
files=dbutils.fs.ls("dbfs:/FileStore/tables/G07/")
for file in files:
print(file.name)
# COMMAND ----------
read_path_rt_stationinfo = BRONZE_STATION_INFO_PATH
read_path_rt_stationstatus = BRONZE_STATION_STATUS_PATH
read_path_rt_weather = BRONZE_NYC_WEATHER_PATH
# COMMAND ----------
# MAGIC %md
# MAGIC #### Pulling the Station Info/Status Bronze Data For our station and preparing Silver table for it
# COMMAND ----------
stationinfo_real_time = spark.read.format("delta").option("ignoreChanges", "true").load(read_path_rt_stationinfo)
stationstatus_real_time = spark.read.format("delta").option("ignoreChanges", "true").load(read_path_rt_stationstatus)
stationinfo_real_time.createOrReplaceTempView('stationinfo_delta')
stationstatus_real_time.createOrReplaceTempView('stationstatus_delta')
# COMMAND ----------
# MAGIC %sql
# MAGIC -- select count(*) from stationinfo_delta --1907
# MAGIC select * from stationinfo_delta
# MAGIC where station_id = "daefc84c-1b16-4220-8e1f-10ea4866fdc7"
# COMMAND ----------
# MAGIC %sql
# MAGIC -- select count(*) from stationstatus_delta --4006993
# MAGIC select from_unixtime(last_reported) as date,num_bikes_available,num_bikes_disabled,*
# MAGIC from stationstatus_delta
# MAGIC where station_id = "daefc84c-1b16-4220-8e1f-10ea4866fdc7"
# MAGIC order by from_unixtime(last_reported) desc
# COMMAND ----------
# MAGIC %md
# MAGIC ##### Preparing the Silver Table for Realtime Station Info Data with UTC Conversion
# COMMAND ----------
realtime_bike_status = spark.sql(
"""
WITH CTE AS(
select sid.station_id,name,region_id,short_name,lat,lon,from_unixtime(last_reported) as last_reported,capacity,num_bikes_available,
num_docks_available,is_installed,num_bikes_disabled,station_status, (capacity-num_bikes_available-num_bikes_disabled) net_availability,
ROW_NUMBER() OVER (PARTITION BY DATE(from_unixtime(last_reported)), HOUR(from_unixtime(last_reported)) ORDER BY from_unixtime(last_reported) DESC) AS RNUM
from stationinfo_delta sid
right join stationstatus_delta ssd
on sid.station_id = ssd.station_id
where sid.name = "Broadway & W 25 St"
)
,cte2 as (
SELECT
date_format(date_trunc('hour',last_reported),'yyyy-MM-dd HH:mm:ss') as last_reported_hour ,capacity,
ifnull(LAG(num_bikes_available) OVER(ORDER BY date_format(date_trunc('hour',last_reported),'yyyy-MM-dd HH:mm:ss')),0) num_bikes_available,
num_docks_available,num_bikes_disabled,net_availability
FROM CTE WHERE RNUM=1
)
select date_format(from_utc_timestamp(last_reported_hour,'GMT-5'),'yyyy-MM-dd HH:mm:ss') as last_reported_hour_est,
*, num_bikes_available - ifnull(LAG(num_bikes_available) OVER(ORDER BY last_reported_hour),0) net_difference
FROM cte2
order by last_reported_hour
"""
)
realtime_bike_status.createOrReplaceTempView('realtime_bike_status_delta')
display(realtime_bike_status)
# COMMAND ----------
# MAGIC %md
# MAGIC ##### Saving Realtime bike status as a Silver Table with required columns.
# COMMAND ----------
delta_table_name = "realtime_bike_status"
realtime_bike_status.write.format("delta").mode("overwrite").option("overwriteSchema", "true").option("path", GROUP_DATA_PATH + "silver"+ delta_table_name).saveAsTable(delta_table_name)
# COMMAND ----------
# MAGIC %md
# MAGIC #### Preparing Real Time Bike-Weather Merged Silver Table Data. Handling UTC conversion.
# COMMAND ----------
#reading realtime weather
weather_real_time = spark.read.format("delta").option("ignoreChanges", "true").load(read_path_rt_weather)
weather_real_time.createOrReplaceTempView('weather_real_time_delta')
display(weather_real_time)
# COMMAND ----------
realtime_bike_weather = spark.sql(
"""
select last_reported_hour as startdate, num_bikes_available, net_difference as net_differece,
from_unixtime(dt) as parsed_date,temp,feels_like, pressure, humidity, dew_point, uvi, clouds, visibility, wind_speed, wind_deg, pop, weather.main[0] as main,weather.description[0] as description
from realtime_bike_status_delta as stg1
LEFT JOIN weather_real_time_delta as stg2
on stg1.last_reported_hour = from_unixtime(stg2.dt)
where stg2.temp is not null
order by stg1.last_reported_hour
""")
# COMMAND ----------
display(realtime_bike_weather)
# COMMAND ----------
# MAGIC %md
# MAGIC ##### Saving Realtime bike and weather data merged as a Silver Table with required columns
# COMMAND ----------
delta_table_name = "realtime_bike_weather_merged"
realtime_bike_weather.write.format("delta").mode("overwrite").option("overwriteSchema", "true").option("path", GROUP_DATA_PATH + "silver"+ delta_table_name).saveAsTable(delta_table_name)
# COMMAND ----------
# MAGIC %md
# MAGIC #### Parsing Reatime Weather Only Data for use in Future Data Prediction and saving as Silver table only with required columns. The time is UTC converted.
# COMMAND ----------
weather_real_time_filtered = spark.sql(
"""
select date_format(from_utc_timestamp(from_unixtime(dt),'GMT-5'),'yyyy-MM-dd HH:mm:ss') as startdate,
temp, humidity
from weather_real_time_delta
order by startdate
""")
delta_table_name = "weather_real_time_filtered"
weather_real_time_filtered.write.format("delta").mode("overwrite").option("overwriteSchema", "true").option("path", GROUP_DATA_PATH + "silver"+ delta_table_name).saveAsTable(delta_table_name)
# COMMAND ----------
import json
# Return Success
dbutils.notebook.exit(json.dumps({"exit_code": "OK"}))