SafeGraph is a leading geospatial data company that specializes in curating and managing over 45 million global points of interest (POI). Our comprehensive dataset includes detailed attributes such as brand affiliation, advanced category tagging, open hours, and precise polygons.. To produce and deliver the high-precision data product to our customers, we build our data processing stack on top of Apache Spark and run up to 1000+ daily Spark applications.
Traceability in the context of data processing refers to the ability to trace the data changes and consistently recreate the same results from data processing workflows. Traceability is an important factor to deliver accurate data products and enhance the internal capability to debug and fix data issues. Like many other companies, we encountered the significant and prevalent challenge of achieving traceability. Some typical examples include
Specifically, we recognize several significant challenges in striving for traceable data processing:
To address these challenges, SafeGraph has developed an architecture called DataChronicle. In this blog post, we will delve into the design and implementation of DataChronicle. It will highlight how DataChronicle operates in the production environment of SafeGraph, serving as a robust foundation for building high-quality data products.
The above diagram depicts the DataChronicle architecture. DataChronicle consists of the following components:
Users have the ability to retrieve the data versions linked to each Spark application which read/write this data within the audit storage. These version numbers can then be utilized as parameters when initiating Spark applications, with the intention of replicating specific results. Moreover, leveraging the CDC (Change Data Capture) table within the data warehouse can assist users in precisely identifying any anomalous alterations in the data.
In the next sections we will give details of each component in DataChronicle.
The DataChronicle library is central to the DataChronicle architecture. It acts as the interface connecting data processing jobs with the data warehouse and model registry. Through its APIs, jobs can read/write versioned tables in the data warehouse and access data models from the registry.
The above figure shows how Spark applications interact with the data warehouse and model registry through the DataChronicle APIs. DataChronicle library serves in two purposes
Instead of exposing the APIs from Apache Iceberg and MLFlow to users directly, DataChronicle masks the technical details and provides unified APIs like read/insert/upsert/delete for data warehouse and uploadModel/fetchModelByStage/fetchModelByVersion for model registry.
This design significantly simplifies our data platform engineering work and makes it future-proof.
As we illustrated in the above figure, when users create a new table managed by the DataChronicle library, they have the option to choose either Delta Lake or Apache Iceberg as the underlying format. Once the format is selected, users can utilize the DataChronicle library APIs to generate versioned tables and perform a wide range of operations, from basic tasks like reading and writing to more advanced tasks like incremental reading and upserting. There are 2 major reasons leading us to design an abstraction on top of Delta Lake and Apache Iceberg:
Uncertainties of open source projects: During the initial phase of our endeavor to establish a foundation for traceability in data processing jobs over two years ago, we encountered notable uncertainties surrounding the open-source projects Delta Lake and Apache Iceberg. In particular, Apache Iceberg was still in its early stages of development with various missing features and bugs. Concerning Delta Lake, we encountered challenges when relying on the open-source version of Delta Lake, as it significantly differed from the private version used by Databricks.
Complexities in Delta Lake/Apache Iceberg: Another key reason driving the development of an API layer on top of Delta Lake and Apache Iceberg is the intention to shield users from certain complexities inherent in these two formats. For instance, with Delta Lake, the absence of dynamic partition overwrite led to using replaceWhere [2][3]. Educating a growing team on its use became challenging, prompting the creation of DataChronicle library APIs to generate replaceWhere statements automatically. Similarly, earlier Apache Iceberg versions posed schema “time-traveling” difficulties [4], addressed by APIs that abstracted these intricacies.
The unified abstraction offered by the DataChronicle library allows us to swiftly establish the groundwork for traceable data processing while ensuring adaptability to future advancements in open-source technology. We began with Delta Lake due to its robustness and gradually transitioned to Apache Iceberg, which evolved to be more mature and gained more community traction. By encapsulating Delta Lake APIs within DataChronicle library APIs and calling them in data processing jobs, the shift to Apache Iceberg required minimal adjustments. Users are not burdened with learning new concepts or SQL commands exclusive to Apache Iceberg; they continue seamlessly with familiar DataChronicle APIs. Migration mainly involves specifying Apache Iceberg instead of Delta Lake as the format when creating versioned tables.
Providing a consistent handling strategy for various scenarios is pivotal in averting confusions and unexpected behaviors across data processing jobs.
One of the example scenarios we address is how we deal with the changed schema between two versions of data. The silent changed schema in upstream is one of the most common root causes of cascading failures in the downstream. Inconsistency between data processing jobs in this aspect, e.g. some silently allow schema changes and some explicitly disable that case and bring even more challenges. Within the DataChronicle library, we adopt a conservative approach of disallowing schema changes by default. Nevertheless, we offer users the flexibility to enable schema changes and configure notification mechanisms, including Slack integration.
Another example is about concurrency control. This addresses situations where multiple Spark applications or threads within a single Spark application use the same version of an Apache Iceberg table as input but generate diverse outputs via different data transformations and then concurrently commit the outputs as the new versions. A uniform strategy for this scenario simplifies user tasks and boosts job reliability without failing conflicted commits too aggressively.
For instance, we enable up to 1000 retries for append-only concurrent commits from the same Spark application. Additionally, we utilize a distributed lock based on DynamoDB to coordinate commits across various Spark applications, sparing users from intricate distributed system challenges. The consistent distributed locking strategy ensures consistent performance during concurrent commits, without users needing to tackle extra complexities.
The purpose of the ingestion framework is to enable users to easily pull data in dynamic sources like RDS, Label Studio and S3 buckets, etc. and save them in Apache Iceberg format in the data warehouse.
As shown in the below figure, we build our ingestion framework on top of Apache Airflow and Spark. Airflow serves as the orchestration engine to periodically trigger the job and Apache Spark is the main processing engine responsible for data reading and writing.
Users in SafeGraph can easily define their ingestion Spark applications using a YAML file, which is hosted on our config server [5]. Below is a YAML configuration that demonstrates setting up a daily job for ingesting data from an S3 bucket and an hourly job for ingesting data from an RDS source.
Tailoring our approach to diverse data sources, we grant users distinct configuration capabilities. As an example, users can define configuration entries for Spark applications to effectively read CSV files from S3. For instance, they can specify the data source type with source_type (s3 or aws rds), they can define the s3 inputs format with source_s3_format, e.g. csv, they could also regulate the behavior of Spark applications behavior like whether to automatically change the ingested table schema with merge_schema. These configurations extend to source-specific Spark configurations like specifying whether the CSV file contains headers, etc.
For the streamlined scheduling of data ingestion Spark applications via Airflow, we've engineered a DAG generator within our Airflow environment. This dynamic generator loads the aforementioned YAML file from the configuration server, crafting a unique DAG for each designated scheduling interval. Within each DAG, individual airflow tasks are automatically instantiated, and each task corresponds to the specific data sources outlined in the configuration.
This design elegantly abstracts the intricacies of data ingestion into a data warehouse, shielding users from unnecessary complexities. Their sole requirement is to draft a YAML file and commit it to our dedicated configuration server. Our ingestion framework seamlessly manages all aspects, encompassing tasks spanning data reading and writing, as well as the orchestrated execution of Spark applications.
We empower users not only to generate standard ingested data tables, but also to create Changed Data Capturing (CDC) tables that explicitly highlight additions, deletions, and updates in the data.
To understand how CDC helps to capture changes in the table, take the following as the example. The below table contains 2 columns, v1 and v2. Two rows exist in the table (v1=1, v2=1) and (v1=2, v2=2)
After we UPSERT (v1=1, v2=2), (v1=2, v2=3) and (v1=4, v2=4) based on column v1. The table has evolved into the below
However, with only the above table we cannot tell the origin of each row as there are multiple possible operations which could generate this table.
Below is an instance of a CDC table generated using Apache Iceberg.The ensuing CDC table showcases captured data changes following the UPSERT operation. By showing (v1=1, v2=1) and (v1=2, v2=2) have been deleted and the corresponding new rows (v1=1, v2=2) and (v1=2, v2=3) being INSERTED, we can easily build the change lineage. With this table, users can easily track the data changes between two ingestions. Additionally, they can parse the table contents and implement incremental processing practices by only consuming the updated rows.
As emphasized in the beginning of this blog post, to achieve traceability in large scale data processing, we need to develop the right rollout strategy at a large scale in addition to the establishment of technical solutions like the DataChronicle Library. To finish this final stretch, we focus on two problems
While the DataChronicle library and Ingestion framework render data inputs traceable for data processing applications, the automatic auditing mechanism aims to actualize this traceability by linking input versions to their corresponding Spark applications.
The cornerstone of the automatic auditing mechanism lies in the utilization of audit logs, as illustrated in the initial examples of this blog post:
Requiring the users to explicitly make a record for every read/write in Spark application code is not reliable and sustainable. Therefore, we implement the automatic auditing functionality in the DataChronicle library by wrapping the main functionality with the code extracting necessary information like version number, Spark application ID, etc. and sending them to the audit logging storage system.
In our automatic audit logging mechanism, we handle the typical race condition on audit data. a Spark application can have multiple threads read/write Spark jobs, and similarly, multiple Spark applications can read/write the same table at the same time. Considering the following scenario
With the above case, we will get a wrong audit log entry due to the race condition where version Y will be associated with Spark application A.
To resolve this challenge, we attach the version tag to the audit log. The version tag consists of Spark application id, thread id and the epoch timestamp when the audit log entry is generated. Correspondingly, we provide the functionality for searching the version based on the version tag in the DataChronicle library. In the above scenario, even if a new version of table written by Spark application B becomes the latest version of the table , Spark application A can still get the latest version written by itself by searching versions based on the version tag consisting of its own Spark application id, thread id, etc. and eventually generate the correct audit log entry.
With the automatic auditing built in DataChronicle architecture, users do not need to remember to make a record of read/write every time or they do not need to deal with complexities to guarantee the correctness of audit log entries.
One of the biggest challenges to leverage the power of Apache Iceberg to version the inputs is to roll out changes up to 1000+ Spark applications.
One of the critical changes involves SparkSQL's Catalog concept introduced in Spark 3.x, utilized by Apache Iceberg to manage tables. Instead of referring to the table as “DatabaseName.TableName”, users need to refer to Apache Iceberg tables as “CatalogName.DatabaseName.TableName”. We need to educate such a new concept in a growing team. Additionally, users can configure inconsistent catalogNames for Apache Iceberg across different Spark applications leading to inconsistency and non-reusable source code.
To resolve this issue, we have developed a module called SparkSQL Extension Shim in the DataChronicle library. As Apache Iceberg registers the catalog name via SparkSQL extension[5], SparkSQL Extension Shim calls the private APIs in Spark to invisibly inject CatalogName configurations into SparkSQL extension. In each Spark application, when users call any DataChronicle library API for the first time, the relevant configuration is automatically injected into SparkSQL extension. Therefore users do not need to make any code change to add configurations.
The SparkSQL Extension Shim not only seamlessly integrates Apache Iceberg with Spark applications but also effectively conceals internal concepts like Catalog to users and eliminates concerns of inconsistent catalog names across various Spark applications.
In this blog post, we unveiled DataChronicle—the foundation of SafeGraph's architecture empowering extensive data processing traceability and the capability of shipping precise data products to customers. Central to this is the DataChronicle library, offering versioned input read/write functionalities while maintaining unified abstraction and consistency across Spark applications. Our streamlined ingestion framework simplifies the conversion of dynamic inputs into traceable, versioned forms. Finally, we achieve seamless tracing through automated auditing and integration with SparkSQL, minimizing overhead.