This series of posts is intended to introduce the uninitiated SQL Server professional to the data warehouse in Azure Synapse Analytics. This data warehouse was formerly known as Azure SQL Data Warehouse, distinct from Azure SQL Database. Further in the past, the Azure SQL Data Warehouse was implemented as the PDW (Parallel Data Warehouse) appliance. This post is the second of three and focuses on a design example to illustrate concepts that will be applied to leverage the MPP architecture that underlies a Synapse Analytics data warehouse (DW – SQLPool). The first blog in this series discussed the advantages of an MPP architecture over the SMP architecture for processing massive data in parallel. As in the first blog, Azure Synapse Analytics Serverless (On Demand) is outside the scope of this discussion.
Synapse Analytics data warehouses are implemented on a Massively Parallel Processing platform, as depicted in Figure 1. Requests are sent to the control node which is running an instance of SQL Server. This node distributes work to the compute nodes, each being a shard, where work is executed, and results passed back to the control node as needed. Any further processing of the results from the shard is done on the control node.
Three distributions are supported. They were described in the prior post:
ROUND_ROBIN and REPLICATE are straightforward and need no further discussion (see Part 1 of this series). On the other hand, a HASH distribution requires some thought as several considerations will influence which column is selected for this distribution. In a HASH distribution, the data is organized among the shards based on a hashed value of the designated column. The distribution is dependent on only one column. The goal in making the choice should allow for as many operations as possible to be pushed to the individual shards. The idea is that these operations can be performed in parallel.
If a poor choice is made for the distribution hash, most query plans will indicate data movement operations. The offending operations are ‘SHUFFLE’ and ‘BROADCAST’. A shuffle operation will indicate that data must be moved between shards or the control node to satisfy a request. A broadcast operation indicates that some set of data is copied to all shards to satisfy a request. Both operations are expensive. Any choice made should avoid these operations to the extent possible.
The goal in making the choice should allow for as many operations as possible to be pushed to the individual shards. The idea is that these operations can be performed in parallel.
The column chosen for hashing should be one that:
- Will provide an even distribution of the data for load and for retrieval to the extent possible. The higher the cardinality of the data, the more likely there will be an even distribution of the data.
- Will be used in any variant of JOIN (inner, outer, etc) operations so that the join can be made on the shard rather than on the control node. The purpose is that no data be moved to support the JOIN in the request.
- Will be used in GROUP BY operations so that partial results can be generated on the shards rather than single threading the operation on the control node.
- Will not be used in WHERE conditions that force large quantities of data to be retrieved from a single shard.
Dates are not generally considered to be a good candidate for a hashing column. This is because loads will normally occur in temporal order which would drive all activity to one or two shards. As an aside, datetimes might be ok as there are many possibilities within a given date, but it is unlikely that a datetime that is granular would be used commonly in a join or group by.
In what follows, table names are bolded and column names are italicized in blue.
Our use case is the collection of click information by persons browsing web pages owned by the tenants. Tenants represent the clients we gather data for. The persons are potential customers of the tenant. These persons are not customers until they can be identified as customers.
For this exercise, a stream of data describing web page activities of varying types is received. This data is stored in the click_stream table. Records flow from the streamed data in batches to the data warehouse. The person key within the ingested data is ingested_person_id(a random guid) and tenant_id(a bigint). The key to the streamed records is the person key plus the request_time, id, and type columns. The person_id_mapping table is a list of distinct ingested_person_id and tenant_id combinations with a generated unique, sequential mapped_person_id. This table is not necessary but provides a much narrower sort key for most queries(a guid and a bigint a bigint).
Raw click information will be stored in a fact table, click. The data in dim_clicks is descriptive and specific to a tenant. The source of this dimensional data is the tenant. While the actual load of this table is not important to this exercise, the choice for distribution is pertinent.
At some time, we may be able to identify a person. This too will be a fact stored in the known_person table. Since multiple tenants may use the same click_id, this table is keyed by the tenant_id/click_id.
The diagram in Figure 2 represents how one might structure the tables for a standard SQL Server implementation.
A customer is only ‘identified’ if there is a record in the known_person table(one of the types of records) for the person key combination in the click_stream table.
- There are 50 billion records in the click table.
- There are about 500 million known_person.
- Approximately 1 billion person_id_mapping records are generated.
- dim_clicks contains 30+ thousands of rows.
- We mostly care about the current records for any given value in the id column for each mapped_person_id of all tables which will be referred to as the ‘current’ records.
- Person counts for Tenants can be highly skewed.
- The tenant_id is stored redundantly in the fact tables. The reason for this is that we need to provide for row level security based on the tenant_id.
Before moving on, pause for a moment to consider how we might distribute the data in each of the tables.
How should the data be distributed among the shards and indexed?
This is a dimension table with relatively few rows. Records from this table will be needed on all shards to lookup the descriptor information. This leads us to use the REPLICATE distribution for this table where a full copy of the table will be maintained on all shards.
The person_id_mapping and known_customer tables could be viewed either as dimension or fact tables, degenerate dimensions in the data warehousing vernacular. Due to the volume of data, a distribution of REPLICATE is not appropriate and so the tables will be considered with the fact tables.
ROUND_ROBIN will distribute the data evenly, but execution plans will show that SHUFFLE operations will be used to satisfy most requests.
person_id_mapping, click, and known_person tables
It is difficult to analyze any of these tables in isolation. The choice made here for any of the tables will be dependent on choices made for the others. There is no good reason to consider REPLICATE for these tables. ROUND_ROBIN will distribute the data evenly, but execution plans will show that SHUFFLE operations will be used to satisfy most requests. That leaves us with a HASH distribution, but on which column?
There are two columns common to all fact tables – mapped_person_id and tenant_id. A third column click_datetime, is common to two of the tables. Reviewing each of the three columns:
- tenant_id – Data for tenant_id can be skewed so distribution will likely not be even. In addition, since all analysis will be filtered for tenant, all retrievals will be from the single shard where that tenant resides. So not a good candidate.
- request_datetime – This column could very well provide an even distribution. The column is unlikely to be used in a join other than for generating the ‘current’ record. Since the current record depends on retrieving the MAX value, grouping on the mapped_person_id, values for the column would be on multiple shards, forcing the processing to the control node.
- mapped_person_id – This is a surrogate for the combination of ingested_person_id and tenant_id. Most queries will rely on a join from the fact table to the mapping table. Further, the person will be used in most GROUP BY’s, allowing at least partial, intermediate grouped results to be generated on the shards in parallel. This seems to be the most likely viable candidate for the HASH distribution.
From the above analysis, we choose to distribute these tables on a HASH of the mapped_person_id. This makes the most sense as most analyses will be based on persons. All data for a given person will be co‑located on one shard.
That leaves the click_stream table. With this design there is no reason to generate a hashed distribution on this table. The column used to distribute the fact tables is generated from the data in this table and so is not available from the stream. The table will be quite large, so a distribution of replicate does not make much sense as the large table would be broadcast to all shards. That leaves us with a distribution of ROUND_ROBIN.
The streamed data and the dimension tables are stored as heaps. A clustered index should be added to the dimension table. A non-clustered index should be added to the click stream table on the keys plus the processed column. The reason for adding the processed column is that this column in addition to the keys, is used to orchestrate the batch processes that load the streamed data into the fact tables.
The fact tables all are sufficiently large that a CLUSTERED COLUMNSTORE INDEX should be utilized. Additional indexes might be useful but there is insufficient information in this example to determine any further indexing.
As an aside, apart from the scope of this blog, the streamed data is archived as it is processed by copying those processed records into a separate table and deleting the original record. Because of this process, it is advantageous to include a clustered index on an additional column that is assured to be ever increasing, a timestamp perhaps. This index will assure that deleted space in the steam table is not scanned to find the end of the data in the heap as new records are streamed in. Note that one should also include a maintenance procedure to clean up the deleted space in the heap.
Design Exercise – An Alternate
After working with the above design for some period, it was observed that the inserts of previously unseen persons into the person_id_mapping table were taking an unacceptable amount of execution time. The issue is that the max prior mapped_person_id needs to be retrieved and then a counter, using ROW_NUMBER(), added to that max value. The larger the table the longer the insert will execute. To address this, a new design was adopted that utilizes an identity to generate the mapped_person_id.:
A person_id_mapping_identity table has been added to the diagram. There are also now relationships between the click_stream and person_id_mapping tables. This was true of the prior design, but the relationship was not displayed to avoid confusion as to how distributions might be designed. Any data added to the person_id_mapping_identity table must be duplicated in the person_id_mapping table.
Why the redundant tables? The primary issue with a single table with an identity is that the identity column cannot be used as the HASH distribution column. Any other distribution on this table would cause shuffle/broadcast operations when looking up the ingested_person_id, which is nearly always done. Further, with a little additional analysis, we can improve the overall processing of the data from the click_stream table to the fact tables.
The distributions on the fact and dimension tables remains the same as in the simple design example. What needs to be (re)considered is the click_stream table and the new person_id_mapping_identity table. For convenience, I will group these tables as Stage tables.
click_stream, person_id_mapping_identity tables
The earlier analysis held that the click_stream table should be organized with a ROUND_ROBIN distribution. With this design we now have a table, person_id_mapping_identity, that cannot be distributed on a hash of the mapped_person_id column. Is there still a good choice? The ingested_person_id has all the attributes that made mapped_person_id the choice for the simple design. Therefore, the choice will be to HASH the person_id_mapping_identity table on ingested_person_id. Now we have a reason to distribute the click_stream table in a manner other than ROUND_ROBIN. The ingested_person_id column is common to both tables. New records are inserted into the person_id_mapping_identity table based on a query that sources records from the click_stream table AND uses an antijoin to establish that the ingested_person_id/tenant_id does not yet exist in the person_id_mapping_identity table. It is therefore advantageous to distribute the click_stream on a hash of the ingested_person_id.
No changes to the decision points for creating indexes is indicated here compared to the simple example.
The following table recaps the choices made.
Azure Synapse Analytics data warehouse can provide scale and performance, querying massive quantities of data if done right. The rules are different than what one would apply in designing for the Azure SQL Database. Here we have considered some of the aspects one must consider when deciding how to distribute a table to the shards of the Synapse Analytics data warehouse. The first blog in this series examined the differences in the architectures between SMP and MPP platforms. The third part of this blog will discuss the elements of tSQL that are different when applied to an Azure Synapse Analytics data warehouse workload.