[Databricks] Create a delta table
For this guide, you will need to download a list of tickers available below:
Import data through DBFS and create a delta table
Hence installed, we can now import the CSV file within Databricks. Go to Catalog>Add>Add data:
Next, Select DBFS as Native integration:
Next, Select your file>Select the mode of table creation then you will find a preview of your table that you can create directly or you can browse a delta table creation through a notebook:
Warning
If you select the option Create Table with UI, you will need to start a cluster to have a compute. Without compute you will have to use the notebook way to create the table.
You should specify the following table attributes:
- Table Name: ticker_listing
- Database: 'default'
- File Type: CSV
- Column Delimiter: ';'
- Check 'First row is header'
Create a delta table from a pandas DataFrame
Before we start, create a new notebook within your Databricks Workspace. It should be inside Repos>YourMailAddress>NameofYourReposiroty.
Select a delta table with SQL & PySpark
%sql
SELECT
symbol, name, exchange, assetType, ipoDate, delistingDate
FROM
default.ticker_listing
WHERE
symbol IN ('AAPL', 'GOOGL', 'AMZN', 'MSFT')
sp_ticker_list = spark.sql('''
SELECT DISTINCT
symbol
FROM
default.ticker_listing
WHERE
symbol IN ('AAPL', 'GOOGL', 'AMZN', 'MSFT')
''')
Make an API call with Python to fetch data
import requests
import pandas as pd
import json
import os
from datetime import datetime
pd.set_option('display.max_columns', 500)
# Set your Alpha Vantage API Key
AV_API_Key='YOURAPIKEY'
# Convert PySpark DataFrame to a Pandas DataFrame and store the value in a list
ticker_list = sp_ticker_list.toPandas().values.tolist()
# Initialize empty DataFrame
ticker_time_series=pd.DataFrame()
ticker_metadata=pd.DataFrame()
# Loop on the ticker list
for ticker in ticker_list :
r_stock = requests.get('https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&outputsize=full&symbol={ticker}&apikey={apiKey}'.format(apiKey=AV_API_Key, ticker=ticker[0]))
js_stock = r_stock.json()
try:
# Ticker time series to DataFrame
df_time_series=pd.DataFrame(js_stock['Time Series (Daily)'])
df_time_series['Ticker'] = ticker[0]
ticker_time_series = pd.concat([ticker_time_series, df_time_series])
# Ticker metadata to DataFrame
df_metadata=pd.json_normalize(js_stock['Meta Data'])
df_metadata['Ticker'] = ticker[0]
df_metadata['Status'] = 'active'
df_metadata['Date'] = datetime.now()
except:
df_metadata = pd.DataFrame({"Ticker" : [ticker[0]], "Status": ["inactive"] , "Date":[ datetime.now()]})
ticker_metadata = pd.concat([ticker_metadata, df_metadata])
display(ticker_metadata.head())
ticker_time_series=ticker_time_series.reset_index().rename({'index':'Value_type'}, axis=1)
lg_ticker_ts = ticker_time_series.melt(id_vars=['Ticker','Value_type'])
display(lg_ticker_ts.head())
Create a temporary table to query your DataFrame with SQL
sp_ticker_ts=spark.createDataFrame(lg_ticker_ts)
sp_ticker_ts.distinct().createOrReplaceTempView('sp_ticker_ts')
Create a new Schema and create a delta table with Spark
%sql
CREATE SCHEMA IF NOT EXISTS bronze;
CREATE SCHEMA IF NOT EXISTS silver;
CREATE SCHEMA IF NOT EXISTS gold
# ticker time series value
sp_ticker_ts=spark.createDataFrame(lg_ticker_ts)
sp_ticker_ts.distinct().createOrReplaceTempView('temp_sp_ticker_ts')
spark.sql('''
CREATE TABLE IF NOT EXISTS vincent_intro_cloud.bronze.ticker_value
USING DELTA
AS SELECT * FROM temp_sp_ticker_ts
''')
# ticker metadata
ticker_metadata = ticker_metadata.rename(columns={"1. Information": "Information", "2. Symbol": "Symbol", "3. Last Refreshed": "Last_Refreshed", "4. Output Size" : "Output_Size", "5. Time Zone" : "Time_Zone"})
sp_ticker_metadata = spark.createDataFrame(ticker_metadata)
sp_ticker_metadata.distinct().createOrReplaceTempView('temp_sp_ticker_metadata')
spark.sql('''
CREATE TABLE IF NOT EXISTS vincent_intro_cloud.bronze.ticker_metadata
USING DELTA
AS SELECT * FROM temp_sp_ticker_metadata
''')