SparkSQL Tables are nothing but collections of rows and columns. Tables also come with metadata such as schema, description, table name, database name, column names, physical location where the data is stored. Metadata of the tables are maintained by Spark. Data is managed by Spark based on whether the table that is created is
Managed
Unmanaged
Spark allows us to create two types of tables as mentioned above. For managed tables, Spark manages both the metadata and the data in the location specified by the spark config variable spark.sql.warehouse.dir(default: /user/hive/warehouse). For unmanaged tables,Spark only manages the metadata, while we manage the data ourselves in an external data source, ex: our PC’s filesystem, GCS etc. Tables can be created only under a database. Spark has a built-in database called default. If we do not specifically create a table under a different database, the table is created under default database.
The following example sets the warehouse directory config that was mentioned above, which will be used to store the data and metadata.
from pyspark.sql import SparkSession
warehouse = "path"
spark = SparkSession.builder.appName("Tables") \
.config("spark.sql.warehouse.dir", warehouse) \
.getOrCreate()
Running these statements will not have an effect on our filesystem until we create some tables/databases. If we do not pass this option, any databases or tables created are created under a directory(within our current directory) called: spark-warehouse
Creating SQL Databases and Tables
i.Creating Databases
Creating a database is very simple and can be done through running a SQL using the sql method of the SparkSession object. The SQL statement can be as simple as the following:
SQL statment: CREATE DATABASE IF NOT EXISTS test_db;
spark.sql("CREATE DATABASE IF NOT EXISTS test_db")
We can check if the database is created in two different ways. The first way is to use the Catalog’s listDatabases method as follows:
print(spark.catalog.listDatabases())
O/p:
[Database(name='test_db1', description='', locationUri='file:/C:/path/to/warehouse/dir/test_db1.db')]
Another way to verify is to check the warehouse directory that was provided for the variable spark.sql.warehouse.dir while building the SparkSession. There will be a folder named test_db.db that will be created, if you had run the same SQL as shown in the above example. If you had changed the name of the database, then you will see a folder ending with “.db” in the warehouse directory.
To maintain simplicity only managed tables are discussed in the below sections.
ii.Creating Tables
Creating tables is just as easy as creating databases and a simple create table statement looks very similar to a SQL statement that creates a database.
CREATE TABLE student(id INT, Name STRING)
spark.sql("""CREATE TABLE student(id INT, Name STRING)""")
The schema details are to be given with the table name that we wish to create. If the CREATE TABLE statement does not have a schema, an exception is thrown. At least when creating empty tables, schema is mandatory.
We can check the presence of the table in two different ways. The first method is to call the method listTables() provided with the Spark Catalog object. It will display all the tables that are currently available in the current database,which is almost always default.
Before diving into the working of listTables(), it is important to know about Current database. Current database is the database in which tables will be created when the DDL does not have the database name information in them. The current database can be checked using the Catalog’s currentDatabase()method. This method does not take any argument and returns the name of the current database.
print(spark.catalog.currentDatabase())
O/p: default
Current database can be changed, the default current database is default. The current database can be changed using the following SQL statement:
'USE test_db'
spark.sql('USE test_db')
print(spark.catalog.currentDatabase())
O/p: test_db
The above statement changes the current database and any table created going forward without any explicit mention of a different database during the creation will be created in the database test_db. This is verified using the call to currentDatabase() method
The listTables() method takes one argument, which is the database name and this is not mandatory. If the argument is provided, then only tables present in the database with the same name as the argument are displayed. In the following example, no argument is passed:
spark.catalog.listTables()
O/P:
[Table(name='student', database='default',
description=None, tableType='MANAGED', isTemporary=False)]
Since we did not provide any database name as a prefix to the table name when creating the above table, the student table was created in the default database and not in the database that was created in the previous section which was test_db. If we want to create the table inside a specific database, we can issue a DDL as follows:
CREATE TABLE test_db.student(id INT, Name STRING)
The above statement can be used to create a table under a specific database even if it is not the current database. If there is a requirement to create multiple tables within the same database, one option is to change the current database to the required database. Which can be done as explained above. The following statements create five different tables under the same database.
spark.sql(“USE test_db”)
spark.sql(“CREATE TABLE student_secA(id INT, Name STRING)”)
spark.sql(“CREATE TABLE student_secB(id INT, Name STRING)”)
spark.sql(“CREATE TABLE student_secC(id INT, Name STRING)”)
spark.sql(“CREATE TABLE student_secD(id INT, Name STRING)”)
spark.sql(“CREATE TABLE student_secE(id INT, Name STRING)”)
spark.catalog.listTables() (or)
spark.catalog.listTables(dbName='test_db')
O/P:
[Table(name='student', database='test_db', description=None, tableType='MANAGED', isTemporary=False),
Table(name='student_seca', database='test_db', description=None, tableType='MANAGED', isTemporary=False),
Table(name='student_secb', database='test_db', description=None, tableType='MANAGED', isTemporary=False),
Table(name='student_secc', database='test_db', description=None, tableType='MANAGED', isTemporary=False),
Table(name='student_secd', database='test_db', description=None, tableType='MANAGED', isTemporary=False),
Table(name='student_sece', database='test_db', description=None, tableType='MANAGED', isTemporary=False)]
By running these SQL statements four tables were created and all of the four under the same database.
Spark provides methods that help us inspect tables individually. The method is listColumns(), this method allows us to inspect the columns of an individual table. The method accepts two arguments. The first is the tableName, which is mandatory, and the other is the dbName which is optional in case the table is present in the current database. Since the current database of the current session is test_db, dbName has not been explicitly passed in the below example:
print(spark.catalog.listColumns(tableName='student_secC'))
O/p:
[Column(name='id', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False), Column(name='Name', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False)]
The method will return a list of Column objects, there will be as many column objects as the number of columns in the corresponding table. Each column object will display metadata about the column, including the name, description(description can be added to a column while creation), dataType, nullable(property that decides whether a column is nullable), is partition column(whether the column is used for partitioning), is Bucket column(whether the column is used for bucketing). Most of the metadata that the column object consists of can be declared during table creation and changed after table creation. Much of these properties are discussed in this article in order to be true to the title. A closer look into these properties will be covered in future articles.
iii.) Inserting into tables:
Now that tables have been created, there has to be some data that can be used for transformations. One simple way to start inserting data is to issue an INSERT statement on the table with hardcoded values in the SQL. The basic syntax of the insert statement in Spark is very much the same as in standard SQL.
INSERT INTO table_name VALUES (val1(or)null,val2(or)null…)
spark.sql('''INSERT INTO student_secA VALUES (1,'Eren')''')
spark.sql('select * from student_secA').show()
Issuing individual insert statements can be tedious when the data that needs to be inserted is high. So Spark allows us to use a single insert statement that can have data for multiple rows. The syntax is as follows:
INSERT INTO table_name VALUES (row1_value1, row1_value2…),(row2_value1, row2_value2…)
spark.sql('''INSERT INTO student_secA VALUES (2,'Ace'),(3,'Sabo'), (4,'Luffy'), (5,'Gwen')''')
The only problem with inserting data using the above syntaxes is that the data has to be hard coded into the SQL. There is an option to parametrize the SQL using some Python logic, but even then the length of the query will become unwieldy. Better way would be to create a table with all the data that needs insertion. When all the data that is to be inserted is in another Spark Table, the insert statement can be used to transfer data after filtering or transformations.
i.) To transfer data between tables the following syntax can be used:
INSERT INTO destination_table_name TABLE source_data_table
I have populated the table students_secA with the following data:
spark.sql('select * from student_secA').show()
Now using the above syntax, the data from table student_secA can be easily transferred to student_secC:
spark.sql('''INSERT INTO student_secC TABLE student_secA''')
spark.sql('''select * from student_secC''').show()
ii.) To selectively transfer data between tables, the following syntaxes can be used:
INSERT INTO destination_table FROM source_table SELECT col1,.... WHERE …
(or)
INSERT INTO destination_table select col1,.. FROM source_table WHERE …
spark.sql("""INSERT INTO student_secB FROM student_secC
SELECT * WHERE id in (1,2,3)""")
spark.sql('select * from student_secB').show()
(or)
spark.sql('''INSERT INTO student_secB
SELECT row_number() over(ORDER BY Name),Name FROM Students
where Section_ID = 1''')
spark.sql('select * from student_secB').show()
iii.) Dropping and truncating tables
There are no methods available in Spark Catalog, as for dropping views, to drop tables. Spark allows us to issue a query that would eventually drop the table. Drop can take different meanings depending on what type of table is involved. When the Drop statement is issued on a Managed table, Spark deletes both the metadata and data(files in the spark.sql.warehouse.dir). If the Drop statement is issued on an Unmanaged(or External) table, Spark only deletes the metadata that was registered in the Catalog. The basic syntaxes for the DROP statement looks like the following
DROP TABLE table_name
(or)
DROP TABLE IF EXISTS table_name
The first statement will remove any table that is present and throws an error when invalid table name is provided. The second statement is to safeguard from table not found errors when the table name passed is invalid.
spark.sql('''DROP TABLE student_secA''')
print('Removed once, trying again')
spark.sql('''DROP TABLE student_secA''')
O/p:
Removed once, trying again
AnalysisException: Table or view not found: student_secA;
As the above example illustrates, the first statement executed without any and removed the table. The second statement throws an error as expected because we are trying to delete a table that is not present. Now if the second syntax is used on the same table, it does not throw any errors:
spark.sql('''DROP TABLE IF EXISTS student_secA''')
The deletion of table can be confirmed using the listTables() method:
spark.catalog.listTables()
O/p:
[Table(name='student', database='test_db', description=None,
tableType='MANAGED', isTemporary=False),
Table(name='student_secb', database='test_db',
description=None, tableType='MANAGED', isTemporary=False),
Table(name='student_secc', database='test_db',
description=None, tableType='MANAGED', isTemporary=False),
Table(name='student_secd', database='test_db',
description=None, tableType='MANAGED', isTemporary=False),
Table(name='student_sece', database='test_db',
description=None, tableType='MANAGED', isTemporary=False)]
There might be cases where the requirement might be to delete all the data in the table but not the table itself. For this requirement, using DROP will not work. Spark provides a TRUNCATE TABLE statement to delete all the data that the table consists of but not the table itself. The statement can also be used to delete specific partition data, but that will not be discussed in this article as the partitioning concept in Tables will be covered in a future article. The simple usage syntax of TRUNCATE TABLE is as follows:
TRUNCATE TABLE table_name
spark.sql("""TRUNCATE TABLE student_secC""")
spark.sql("select * from student_secC").show()
Now the table student_secC is as clean as a new table.
One important point to note in most of the above SQLs is that the database was not explicitly mentioned. This was permissible because the current database was changed at an earlier stage which then allowed it to work with the tables of the database without expecting the database information in the SQLs. If the table you might be working with is present in a different database than the current, do not forget to mention the database details in the SQL as follows:
Database_name.table_name
Otherwise the following error will be encountered:
spark.sql("USE default")
spark.sql("""TRUNCATE TABLE student_secB""")
O/P:
AnalysisException: Table or view 'student_secb' not found in database 'default';
Conclusion:
I believe this article is sufficient to give you a basic idea about Spark managed tables and how to work with them. If you have any suggestions or questions please post it in the comment box. This article, very much like every article in this blog, will be updated based on comments and as I find better ways of explaining things. So kindly bookmark this page and checkout whenever you need some reference.
Happy Learning! 😀
Comments
Post a Comment