Not logged in. Login

Weather Data

Here's the code that I have that will extract various GHCN weather data.

Running this will take time, so please avoid running it when the cluster is busy with assignment work. Make sure you have access to the web frontend of your job so you can monitor its progress. It should take 5 minutes or so, depending on the details of your filters

import sys
from pyspark.sql import SparkSession, functions, types, Row

spark = SparkSession.builder.appName('GHCN extracter').getOrCreate()

ghcn_path = '/courses/datasets/ghcn-splits'
ghcn_stations = '/courses/datasets/ghcn-more/ghcnd-stations.txt'
output = 'ghcn-subset'


observation_schema = types.StructType([
    types.StructField('station', types.StringType(), False),
    types.StructField('date', types.StringType(), False),  # becomes a types.DateType in the output
    types.StructField('observation', types.StringType(), False),
    types.StructField('value', types.IntegerType(), False),
    types.StructField('mflag', types.StringType(), False),
    types.StructField('qflag', types.StringType(), False),
    types.StructField('sflag', types.StringType(), False),
    types.StructField('obstime', types.StringType(), False),
])


station_schema = types.StructType([
    types.StructField('station', types.StringType(), False),
    types.StructField('latitude', types.FloatType(), False),
    types.StructField('longitude', types.FloatType(), False),
    types.StructField('elevation', types.FloatType(), False),
    types.StructField('name', types.StringType(), False),
])


def station_data(line):
    return [line[0:11].strip(), float(line[12:20]), float(line[21:30]), float(line[31:37]), line[41:71].strip()]


def main():
    sc = spark.sparkContext
    
    ## Stations data...
    stations_rdd = sc.textFile(ghcn_stations).map(station_data)
    stations = spark.createDataFrame(stations_rdd, schema=station_schema).hint('broadcast')

    ## Observations data...
    obs = spark.read.csv(ghcn_path, header=None, schema=observation_schema)

    ## Filter as we like...
    # keep only some years: still a string comparison here
    obs = obs.filter((obs['date'] >= '2012') & (obs['date'] <= '2015'))
    
    obs = obs.filter(functions.isnull(obs['qflag']))
    obs = obs.drop(obs['mflag']).drop(obs['qflag']).drop(obs['sflag']).drop(obs['obstime'])
    obs = obs.filter(obs['observation'].isin('TMAX', 'PRCP'))

    # parse the date string into a real date object    
    obs = obs.withColumn('newdate', functions.to_date(obs['date'], 'yyyyMMdd'))
    obs = obs.drop('date').withColumnRenamed('newdate', 'date')   
    
    # optional, if you want the station data joined...
    #obs = obs.join(stations, on='station')

    obs.write.json(output + '/ghcn', mode='overwrite', compression='gzip')
    

main()

From there, you can have a look at the data and make sure everything seems reasonable:

hdfs dfs -ls ghcn-subset/ghcn

To get the data out of HDFS and back to your gateway home directory:

hdfs dfs -copyToLocal ghcn-subset/ghcn .

And you can SCP from there to your computer and work with it as you wish.

Updated Thu Aug. 22 2024, 10:14 by ggbaker.