# Databricks

```python
diamonds = (spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv")
)

diamonds.write.format("delta").mode("overwrite").save("/mnt/delta/diamonds")
```

```sql
DROP TABLE IF EXISTS diamonds;

CREATE TABLE diamonds USING DELTA LOCATION '/mnt/delta/diamonds/'
```

### Read CSV

```python
csvFile = "/mnt/training/wikipedia/pageviews/pageviews_by_second.tsv"

df = (spark.read                        # The DataFrameReader
   .option("header", "true")       # Use first line of all files as header
   .option("sep", "\t")            # Use tab delimiter (default is comma-separator)
   .option("inferSchema", "true")  # Automatically infer data types
   .csv(csvFile)                   # Creates a DataFrame from CSV after reading in the file
   .printSchema()
)

from pyspark.sql.types import *

csvSchema = StructType([
  StructField("timestamp", StringType(), False),
  StructField("site", StringType(), False),
  StructField("requests", IntegerType(), False)
])
df = (spark.read                   # The DataFrameReader
  .option('header', 'true')   # Ignore line #1 - it's a header
  .option('sep', "\t")        # Use tab delimiter (default is comma-separator)
  .schema(csvSchema)          # Use the specified schema
  .csv(csvFile)               # Creates a DataFrame from CSV after reading in the file
  .printSchema()
)
```

### Read Json

```
%fs ls dbfs:/mnt/training/wikipedia/edits/snapshot-2016-05-26.json
%fs head dbfs:/mnt/training/wikipedia/edits/snapshot-2016-05-26.json
```

```python
jsonFile = "dbfs:/mnt/training/wikipedia/edits/snapshot-2016-05-26.json"

wikiEditsDF = (spark.read           # The DataFrameReader
    .option("inferSchema", "true")  # Automatically infer data types & column names
    .json(jsonFile)                 # Creates a DataFrame from JSON after reading in the file
 )
wikiEditsDF.printSchema()

wikiEditsDF.createOrReplaceTempView("wiki_edits")


# Schema
from pyspark.sql.types import *

jsonSchema = StructType([
  StructField("channel", StringType(), True),
  StructField("comment", StringType(), True),
  StructField("delta", IntegerType(), True),
  StructField("flag", StringType(), True),
  StructField("geocoding", StructType([
    StructField("city", StringType(), True),
    StructField("country", StringType(), True),
    StructField("countryCode2", StringType(), True),
    StructField("countryCode3", StringType(), True),
    StructField("stateProvince", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True)
  ]), True),
  StructField("isAnonymous", BooleanType(), True),
  StructField("isNewPage", BooleanType(), True),
  StructField("isRobot", BooleanType(), True),
  StructField("isUnpatrolled", BooleanType(), True),
  StructField("namespace", StringType(), True),
  StructField("page", StringType(), True),
  StructField("pageURL", StringType(), True),
  StructField("timestamp", StringType(), True),
  StructField("url", StringType(), True),
  StructField("user", StringType(), True),
  StructField("userURL", StringType(), True),
  StructField("wikipediaURL", StringType(), True),
  StructField("wikipedia", StringType(), True)
])

df = (spark.read            # The DataFrameReader
  .schema(jsonSchema)  # Use the specified schema
  .json(jsonFile)      # Creates a DataFrame from JSON after reading in the file
  .printSchema()
)
```

```sql
%sql

SELECT * FROM wiki_edits


%sql

SELECT channel, page, geocoding.city, geocoding.latitude, geocoding.longitude 
FROM wiki_edits 
WHERE geocoding.city IS NOT NULL

```

### Read Parquet

````
```
%fs ls /mnt/training/wikipedia/pagecounts/staging_parquet_en_only_clean/
```
````

```python
parquetFile = "/mnt/training/wikipedia/pageviews/pageviews_by_second.parquet/"

(spark.read              # The DataFrameReader
  .parquet(parquetFile)  # Creates a DataFrame from Parquet after reading in the file
  .printSchema()         # Print the DataFrame's schema
)



from pyspark.sql.types import *

parquetSchema = StructType(
  [
    StructField("timestamp", StringType(), False),
    StructField("site", StringType(), False),
    StructField("requests", IntegerType(), False)
  ]
)

(spark.read               # The DataFrameReader
  .schema(parquetSchema)  # Use the specified schema
  .parquet(parquetFile)   # Creates a DataFrame from Parquet after reading in the file
  .printSchema()          # Print the DataFrame's schema
)

parquetDF = spark.read.schema(parquetSchema).parquet(parquetFile)
parquetDF.rdd.getNumPartitions()
```

```sql
DROP TABLE IF EXISTS diamonds;

CREATE TABLE diamonds
USING csv
OPTIONS (path "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv", header "true")
```

## References

* <https://docs.databricks.com/en/getting-started/quick-start.html>


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.datahungry.dev/library/databricks.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
