Databricks
diamonds = (spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv")
)
diamonds.write.format("delta").mode("overwrite").save("/mnt/delta/diamonds")
DROP TABLE IF EXISTS diamonds;
CREATE TABLE diamonds USING DELTA LOCATION '/mnt/delta/diamonds/'
Read CSV
csvFile = "/mnt/training/wikipedia/pageviews/pageviews_by_second.tsv"
df = (spark.read # The DataFrameReader
.option("header", "true") # Use first line of all files as header
.option("sep", "\t") # Use tab delimiter (default is comma-separator)
.option("inferSchema", "true") # Automatically infer data types
.csv(csvFile) # Creates a DataFrame from CSV after reading in the file
.printSchema()
)
from pyspark.sql.types import *
csvSchema = StructType([
StructField("timestamp", StringType(), False),
StructField("site", StringType(), False),
StructField("requests", IntegerType(), False)
])
df = (spark.read # The DataFrameReader
.option('header', 'true') # Ignore line #1 - it's a header
.option('sep', "\t") # Use tab delimiter (default is comma-separator)
.schema(csvSchema) # Use the specified schema
.csv(csvFile) # Creates a DataFrame from CSV after reading in the file
.printSchema()
)
Read Json
%fs ls dbfs:/mnt/training/wikipedia/edits/snapshot-2016-05-26.json
%fs head dbfs:/mnt/training/wikipedia/edits/snapshot-2016-05-26.json
jsonFile = "dbfs:/mnt/training/wikipedia/edits/snapshot-2016-05-26.json"
wikiEditsDF = (spark.read # The DataFrameReader
.option("inferSchema", "true") # Automatically infer data types & column names
.json(jsonFile) # Creates a DataFrame from JSON after reading in the file
)
wikiEditsDF.printSchema()
wikiEditsDF.createOrReplaceTempView("wiki_edits")
# Schema
from pyspark.sql.types import *
jsonSchema = StructType([
StructField("channel", StringType(), True),
StructField("comment", StringType(), True),
StructField("delta", IntegerType(), True),
StructField("flag", StringType(), True),
StructField("geocoding", StructType([
StructField("city", StringType(), True),
StructField("country", StringType(), True),
StructField("countryCode2", StringType(), True),
StructField("countryCode3", StringType(), True),
StructField("stateProvince", StringType(), True),
StructField("latitude", DoubleType(), True),
StructField("longitude", DoubleType(), True)
]), True),
StructField("isAnonymous", BooleanType(), True),
StructField("isNewPage", BooleanType(), True),
StructField("isRobot", BooleanType(), True),
StructField("isUnpatrolled", BooleanType(), True),
StructField("namespace", StringType(), True),
StructField("page", StringType(), True),
StructField("pageURL", StringType(), True),
StructField("timestamp", StringType(), True),
StructField("url", StringType(), True),
StructField("user", StringType(), True),
StructField("userURL", StringType(), True),
StructField("wikipediaURL", StringType(), True),
StructField("wikipedia", StringType(), True)
])
df = (spark.read # The DataFrameReader
.schema(jsonSchema) # Use the specified schema
.json(jsonFile) # Creates a DataFrame from JSON after reading in the file
.printSchema()
)
%sql
SELECT * FROM wiki_edits
%sql
SELECT channel, page, geocoding.city, geocoding.latitude, geocoding.longitude
FROM wiki_edits
WHERE geocoding.city IS NOT NULL
Read Parquet
```
%fs ls /mnt/training/wikipedia/pagecounts/staging_parquet_en_only_clean/
```
parquetFile = "/mnt/training/wikipedia/pageviews/pageviews_by_second.parquet/"
(spark.read # The DataFrameReader
.parquet(parquetFile) # Creates a DataFrame from Parquet after reading in the file
.printSchema() # Print the DataFrame's schema
)
from pyspark.sql.types import *
parquetSchema = StructType(
[
StructField("timestamp", StringType(), False),
StructField("site", StringType(), False),
StructField("requests", IntegerType(), False)
]
)
(spark.read # The DataFrameReader
.schema(parquetSchema) # Use the specified schema
.parquet(parquetFile) # Creates a DataFrame from Parquet after reading in the file
.printSchema() # Print the DataFrame's schema
)
parquetDF = spark.read.schema(parquetSchema).parquet(parquetFile)
parquetDF.rdd.getNumPartitions()
DROP TABLE IF EXISTS diamonds;
CREATE TABLE diamonds
USING csv
OPTIONS (path "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv", header "true")
References
Last updated