Site icon R-bloggers

Speed Up Data Analytics and Wrangling With Parquet Files

[This article was first published on RStudio | Open source & professional software for data science teams on RStudio, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

Photo by Jake Givens on Unsplash

This is a guest post from Ryan Garnett, Ray Wong, and Dan Reed from Green Shield Canada. Green Shield Canada, a social enterprise and one of the country’s largest health benefits carriers, currently serves over 4.5 million Canadians across health and dental benefits and pharmacy benefits management. GSC also provides clients with an integrated experience that includes health care delivery via an ever-expanding digital health ecosystem and full benefits administration support.
  • The Challenge

    Data is increasing in value for many organizations — an asset leveraged to help make informed business decisions. Unfortunately, this sentiment has not been the norm throughout the life of most organizations, with vast amounts of data locked in legacy data management systems and designs. The majority of organizations use relational database management systems (RDBMS) like Oracle, Postgres, Microsoft SQL, or MySQL to store and manage their enterprise data. Typically these systems were designed to collect and process data quickly within a transactional data model. While these models are excellent for applications, they can pose challenges for performing business intelligence, data analytics, or predictive analysis. Many organizations are realizing their legacy systems are not sufficient for data analytics initiatives, providing an opportunity for analytics teams to present tangible options to improve their organization’s data analytics infrastructure.

    Regardless if you are engineering data for others to consume for analysis, or performing the analytics, reducing the time to perform data processing is critically important. Within this post, we are going to evaluate the performance of two distinct data storage formats; row-based (CSV) and columnar (parquet); with CSV being a tried and tested standard data format used within the data analytics field, and parquet becoming a viable alternative in many data platforms.

    Setup

    We performed the analysis for the post on Green Shield Canada’s analytics workstation. Our workstation is a shared resource for our analytics team that is running RStudio Workbench with the following configurations:

    Operating system Ubuntu 20
    Cores 16
    CPU speed 2.30GHz
    RAM 1TB

    Load Packages

    We use the following packages throughout the post:

    # Importing data
    library(arrow)
    library(readr)
    
    # Data analysis and wrangling
    library(dplyr)
    
    # Visualization and styling
    library(ggplot2)
    library(gt)
    

    Data Sources

    We store Green Shield Canada’s source data in a transactional data model within an Oracle database. The purpose of the transaction model within Oracle is to quickly adjudicate medical claims within Green Shield’s Advantage application, and it has been performing exceptionally well. While a transactional data model provides great performance for transactional applications, the data model design is less than optimal for data analytics uses. Green Shield Canada, like many organizations, is undergoing a significant digital transformation with a high emphasis on data analytics. During the digital transformation, an analytical data model will be developed, built from many of the source tables currently stored in Oracle database tables, with the need to perform numerous data wrangling tasks.

    Within Green Shield Canada, data is sized based on the following four groups:

    • x-small dataset < 1M rows (day)
    • small dataset 1-10M rows (month)
    • medium data 10-100M rows (year)
    • large data > 100M-1B rows (decade)

    The main dataset used within the analysis is Green Shield Canada’s claim history data. This dataset includes various data elements related to the transactional claims submitted by Green Shield’s clients. This dataset is critically important to the organization, providing valuable insights into how the company operates and the service provided to our customers. The following is a table with the characteristics related to the claim history dataset.

    Dataset Characteristics
    Claim History Data
    Dataset Size Group Dataset Name Number of Rows Number of Columns CSV File Size Parquet File Size
    x-small claim_history_day 317,617 201 281.8 MB 38.1 MB
    small claim_history_month 5,548,609 202 4.8 GB 711.9 MB
    medium claim_history_year 66,001,292 201 57.3 GB 7.5 GB
    large claim_history 408,197,137 201 351.5 GB 45.1 GB

    The second dataset used within the analysis is Green Shield Canada’s provider data. This dataset includes various data elements related to the provider network that provides medical services for Green Shield Canada’s customers. The following is a table with the characteristics associated with the provider dataset.

    Dataset Characteristics
    Provider Data
    Dataset Name Number of Rows Number of Columns CSV File Size Parquet File Size
    provider 1,077,046 18 146.1 MB 31 MB

    The Solution

    Green Shield Canada has decided to convert data sources used for analytics from row-based sources to a columnar format, specifically Apache Parquet.

    Apache Parquet is an open source, column-oriented data file format designed for efficient data storage and retrieval. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk.
    — Apache Foundation

    We leverage the arrow R package to convert our row-based datasets into parquet files. Parquet partitions data into smaller chunks and enables improved performance when filtering against columns that have partitions.

    Parquet file formats have three main benefits for analytical usage:

    • Compression: low storage consumption
    • Speed: efficiently reads data in less time
    • Interoperability: can be read by many different languages

    Converting our datasets from row-based (CSV) to columnar (parquet) has significantly reduced the file size. The CSV files range from 4.7 to 7.8 times larger than parquet files.

    We will explore computationally expensive tasks in both data engineering and data analysis processes. We will perform four specific tasks on all four of the data sizes groups (x-small, small, medium, and large) produced from our claim history dataset.

    1. join provider information to claim history
    2. processed claims volume by benefit type per time interval (i.e., day, month, and/or year)
    3. processed claims statistics by benefit type per time interval (i.e., day, month, and/or year)
    4. provider information with processed claims statistics by benefit type per time interval (i.e., day, month, and/or year)

    X-Small Data

    The x-small data consists of data collected on a single day in January 2021. The dataset has 317,617 rows of data.

    CSV Data

    The CSV file used in this section was 281.8 MB in size.

    # Task 1 - join
    start <- Sys.time()
    
    claims_columns <-
      cols_only(CLAIM_STATUS_TYPE_CD = col_character(),
                CH_SUBM_PROVIDER_ID = col_double(),
                BNFT_TYPE_CD = col_character(),
                CH_REND_AMT = col_double())
    
    provider_columns <-
      cols_only(provider_id = col_double(),
                provider_type = col_character(),
                benefit_description = col_character())
    
    left_join(
      read_csv("/home/data/CLAIM_HISTORY_DAY.csv") %>%
        filter(CLAIM_STATUS_TYPE_CD == "PC"),
      read_csv("/home/data/PROVIDER.csv"),
      by = c("CH_SUBM_PROVIDER_ID" = "provider_id"))
      
    end <- Sys.time()
    end - start
    

    The task took 16.006 secs to execute.

    # Task 2 - group_by + count
    start <- Sys.time()
    
    claims_columns <-
      cols_only(CLAIM_STATUS_TYPE_CD = col_character(),
                CH_SUBM_PROVIDER_ID = col_double(),
                BNFT_TYPE_CD = col_character(),
                CH_REND_AMT = col_double())
    
    read_csv("/home/data/CLAIM_HISTORY_DAY.csv") %>%
      filter(CLAIM_STATUS_TYPE_CD == "PC") %>%
      group_by(BNFT_TYPE_CD) %>%
      count() %>%
      ungroup() %>%
      arrange(desc(n))
      
    end <- Sys.time()
    end - start
    

    The task took 10.84989 secs to execute.

    # Task 3 - group_by + summarize
    start <- Sys.time()
    
    claims_columns <-
      cols_only(CLAIM_STATUS_TYPE_CD = col_character(),
                CH_SUBM_PROVIDER_ID = col_double(),
                BNFT_TYPE_CD = col_character(),
                CH_REND_AMT = col_double())
    
    provider_columns <-
      cols_only(provider_id = col_double(),
                provider_type = col_character(),
                benefit_description = col_character())
    
    read_csv("/home/data/CLAIM_HISTORY_DAY.csv") %>%
      filter(CLAIM_STATUS_TYPE_CD == "PC") %>%
      group_by(BNFT_TYPE_CD) %>%
      summarize(minimum_amount =
                  min(CH_REND_AMT, na.rm = TRUE),
                mean_amount =
                  mean(CH_REND_AMT, na.rm = TRUE),
                max_amount =
                  max(CH_REND_AMT, na.rm = TRUE)) %>%
      ungroup()
      
    end <- Sys.time()
    end - start
    

    The task took 11.8559 secs to execute.

    # Task 4 - join + group_by + summarize
    start <- Sys.time()
    
    claims_columns <-
      cols_only(CLAIM_STATUS_TYPE_CD = col_character(),
                CH_SUBM_PROVIDER_ID = col_double(),
                BNFT_TYPE_CD = col_character(),
                CH_REND_AMT = col_double())
    
    provider_columns <-
      cols_only(provider_id = col_double(),
                provider_type = col_character(),
                benefit_description = col_character())
    
    left_join(
      read_csv("/home/data/CLAIM_HISTORY_DAY.csv") %>%
        filter(CLAIM_STATUS_TYPE_CD == "PC"),
      read_csv("/home/data/PROVIDER.csv"),
      by = c("CH_SUBM_PROVIDER_ID" = "provider_id")) %>%
      group_by(benefit_description, BNFT_TYPE_CD) %>%
      summarize(minimum_amount =
                  min(CH_REND_AMT, na.rm = TRUE),
                mean_amount =
                  mean(CH_REND_AMT, na.rm = TRUE),
                max_amount =
                  max(CH_REND_AMT, na.rm = TRUE)) %>%
      ungroup()
      
    end <- Sys.time()
    end - start
    

    The task took 16.02928 secs to execute.

    Parquet Data

    The parquet file used in this section was 38.1 MB in size.

    # Task 1 - join
    start <- Sys.time()
    
    left_join(
      open_dataset(source = "/home/data/CLAIM_HISTORY_DAY") %>%
        filter(CLAIM_STATUS_TYPE_CD == "PC") %>%
        select(CH_SUBM_PROVIDER_ID, BNFT_TYPE_CD, CH_REND_AMT),
      open_dataset(sources = "/home/data/Provider") %>%
        select(provider_id,
               provider_type,
               benefit_description),
      by = c("CH_SUBM_PROVIDER_ID" = "provider_id")) %>%
      collect()
      
    end <- Sys.time()
    end - start
    

    The task took 1.776429 secs to execute.

    # Task 2 - group_by + count
    start <- Sys.time()
    
    open_dataset(source = "/home/data/CLAIM_HISTORY_DAY") %>%
      filter(CLAIM_STATUS_TYPE_CD == "PC") %>%
      select(BNFT_TYPE_CD) %>%
      group_by(BNFT_TYPE_CD) %>%
      count() %>%
      ungroup() %>%
      arrange(desc(n)) %>%
      collect()
      
    end <- Sys.time()
    end - start
    

    The task took 0.7456837 secs to execute.

    # Task 3 - group_by + summarize
    start <- Sys.time()
    
    open_dataset(source = "/home/data/CLAIM_HISTORY_DAY") %>%
      filter(CLAIM_STATUS_TYPE_CD == "PC") %>%
      select(BNFT_TYPE_CD, CH_REND_AMT) %>%
      group_by(BNFT_TYPE_CD) %>%
      summarize(minimum_amount =
                  min(CH_REND_AMT, na.rm = TRUE),
                mean_amount =
                  mean(CH_REND_AMT, na.rm = TRUE),
                max_amount =
                  max(CH_REND_AMT, na.rm = TRUE)) %>%
      ungroup() %>%
      collect()
      
    end <- Sys.time()
    end - start
    

    The task took 0.2979383 secs to execute.

    # Task 4 - join + group_by + summarize
    start <- Sys.time()
    
    left_join(
      open_dataset(source = "/home/data/CLAIM_HISTORY_DAY") %>%
        filter(CLAIM_STATUS_TYPE_CD == "PC") %>%
        select(CH_SUBM_PROVIDER_ID, BNFT_TYPE_CD, CH_REND_AMT),
      open_dataset(sources = "/home/data/Provider") %>%
        select(provider_id,
               provider_type,
               benefit_description),
      by = c("CH_SUBM_PROVIDER_ID" = "provider_id")) %>%
      group_by(benefit_description, BNFT_TYPE_CD) %>%
      summarize(minimum_amount =
                  min(CH_REND_AMT, na.rm = TRUE),
                mean_amount =
                  mean(CH_REND_AMT, na.rm = TRUE),
                max_amount =
                  max(CH_REND_AMT, na.rm = TRUE)) %>%
      ungroup() %>%
      collect()
      
    end <- Sys.time()
    end - start
    

    The task took 1.359842 secs to execute.

    Small Data

    The small data consists of data collected in January 2021. The dataset has 5,548,609 rows of data.

    CSV Data

    The CSV file used in this section was 4.8 GB in size.

    # Task 1 - join
    start <- Sys.time()
    claims_columns <-
      cols_only(CLAIM_STATUS_TYPE_CD = col_character(),
                CH_SUBM_PROVIDER_ID = col_double(),
                BNFT_TYPE_CD = col_character(),
                CH_REND_AMT = col_double())
    
    provider_columns <-
      cols_only(provider_id = col_double(),
                provider_type = col_character(),
                benefit_description = col_character())
    
    left_join(
      read_csv("/home/data/CLAIM_HISTORY_MONTH.csv") %>%
        filter(CLAIM_STATUS_TYPE_CD == "PC"),
      read_csv("/home/data/PROVIDER.csv"),
      by = c("CH_SUBM_PROVIDER_ID" = "provider_id"))
      
    end <- Sys.time()
    end - start
    

    The task took 3.677011 mins to execute.

    # Task 2 - group_by + count
    start <- Sys.time()
    claims_columns <-
      cols_only(CLAIM_STATUS_TYPE_CD = col_character(),
                CH_SUBM_PROVIDER_ID = col_double(),
                BNFT_TYPE_CD = col_character(),
                CH_REND_AMT = col_double(),
                PROCESS_DAY = col_double())
    
    read_csv("/home/data/CLAIM_HISTORY_MONTH.csv") %>%
      filter(CLAIM_STATUS_TYPE_CD == "PC") %>%
      group_by(BNFT_TYPE_CD, PROCESS_DAY) %>%
      count() %>%
      ungroup() %>%
      arrange(desc(n))
      
    end <- Sys.time()
    end - start
    

    The task took 3.161771 mins to execute.

    # Task 3 - group_by + summarize
    start <- Sys.time()
    claims_columns <-
      cols_only(CLAIM_STATUS_TYPE_CD = col_character(),
                CH_SUBM_PROVIDER_ID = col_double(),
                BNFT_TYPE_CD = col_character(),
                CH_REND_AMT = col_double(),
                PROCESS_DAY = col_double())
    
    provider_columns <-
      cols_only(provider_id = col_double(),
                provider_type = col_character(),
                benefit_description = col_character())
    
    read_csv("/home/data/CLAIM_HISTORY_MONTH.csv") %>%
      filter(CLAIM_STATUS_TYPE_CD == "PC") %>%
      group_by(BNFT_TYPE_CD, PROCESS_DAY) %>%
      summarize(minimum_amount =
                  min(CH_REND_AMT, na.rm = TRUE),
                mean_amount =
                  mean(CH_REND_AMT, na.rm = TRUE),
                max_amount =
                  max(CH_REND_AMT, na.rm = TRUE)) %>%
      ungroup()
      
    end <- Sys.time()
    end - start
    

    The task took 3.095256 mins to execute.

    # Task 4 - join + group_by + summarize
    start <- Sys.time()
    
    claims_columns <-
      cols_only(CLAIM_STATUS_TYPE_CD = col_character(),
                CH_SUBM_PROVIDER_ID = col_double(),
                BNFT_TYPE_CD = col_character(),
                CH_REND_AMT = col_double(),
                PROCESS_DAY = col_double())
    
    provider_columns <-
      cols_only(provider_id = col_double(),
                provider_type = col_character(),
                benefit_description = col_character())
    
    left_join(
      read_csv("/home/data/CLAIM_HISTORY_MONTH.csv") %>%
        filter(CLAIM_STATUS_TYPE_CD == "PC"),
      read_csv("/home/data/PROVIDER.csv"),
      by = c("CH_SUBM_PROVIDER_ID" = "provider_id")) %>%
      group_by(benefit_description,
               BNFT_TYPE_CD,
               PROCESS_DAY) %>%
      summarize(minimum_amount =
                  min(CH_REND_AMT, na.rm = TRUE),
                mean_amount =
                  mean(CH_REND_AMT, na.rm = TRUE),
                max_amount =
                  max(CH_REND_AMT, na.rm = TRUE)) %>%
      ungroup()
      
    end <- Sys.time()
    end - start
    

    The task took 3.44803 mins to execute.

    Parquet Data

    The parquet file used in this section was 711.9 MB in size.

    # Task 1 - join
    start <- Sys.time()
    
    left_join(
      open_dataset(
        source = "/home/data/CLAIM_HISTORY_MONTH"
      ) %>%
        filter(CLAIM_STATUS_TYPE_CD == "PC") %>%
        select(CH_SUBM_PROVIDER_ID,
               BNFT_TYPE_CD, 
               CH_REND_AMT),
      open_dataset(sources = "/home/data/Provider") %>%
        select(provider_id, provider_type, benefit_description),
      by = c("CH_SUBM_PROVIDER_ID" = "provider_id")) %>%
      collect()
      
    end <- Sys.time()
    end - start
    

    The task took 1.604066 secs to execute.

    # Task 2 - group_by + count
    start <- Sys.time()
    
    open_dataset(source = "/home/data/CLAIM_HISTORY_MONTH") %>%
      filter(CLAIM_STATUS_TYPE_CD == "PC") %>%
      select(BNFT_TYPE_CD) %>%
      group_by(BNFT_TYPE_CD) %>%
      count() %>%
      ungroup() %>%
      arrange(desc(n)) %>%
      collect()
      
    end <- Sys.time()
    end - start
    

    The task took 0.3016093 secs to execute.

    # Task 3 - group_by + summarize
    start <- Sys.time()
    
    open_dataset(source = "/home/data/CLAIM_HISTORY_MONTH") %>%
      filter(CLAIM_STATUS_TYPE_CD == "PC") %>%
      select(BNFT_TYPE_CD, CH_REND_AMT, PROCESS_DAY) %>%
      group_by(BNFT_TYPE_CD, PROCESS_DAY) %>%
      summarize(minimum_amount =
                  min(CH_REND_AMT, na.rm = TRUE),
                mean_amount = 
                  mean(CH_REND_AMT, na.rm = TRUE),
                max_amount = 
                  max(CH_REND_AMT, na.rm = TRUE)) %>%
      ungroup() %>%
      collect()
      
    end <- Sys.time()
    end - start
    

    The task took 0.5149045 secs to execute.

    # Task 4 - join + group_by + summarize
    start <- Sys.time()
    
    left_join(
      open_dataset(
        source = "/home/data/CLAIM_HISTORY_MONTH"
      ) %>%
        filter(CLAIM_STATUS_TYPE_CD == "PC") %>%
        select(CH_SUBM_PROVIDER_ID,
               BNFT_TYPE_CD, 
               CH_REND_AMT, 
               PROCESS_DAY),
      open_dataset(sources = "/home/data/Provider") %>%
        select(provider_id, 
               provider_type,
               benefit_description),
      by = c("CH_SUBM_PROVIDER_ID" = "provider_id")) %>%
      group_by(benefit_description,
               BNFT_TYPE_CD, 
               PROCESS_DAY) %>%
      summarize(minimum_amount =
                  min(CH_REND_AMT, na.rm = TRUE),
                mean_amount = 
                  mean(CH_REND_AMT, na.rm = TRUE),
                max_amount = 
                  max(CH_REND_AMT, na.rm = TRUE)) %>%
      ungroup() %>%
      collect()
      
    end <- Sys.time()
    end - start
    

    The task took 1.12566 secs to execute.

    Medium Data

    The medium data consists of data collected over 2021. The dataset has 66,001,292 rows of data.

    CSV Data

    The CSV file used in this section was 57.3 GB in size.

    # Task 1 - join
    start <- Sys.time()
    
    claims_columns <-
      cols_only(CLAIM_STATUS_TYPE_CD = col_character(),
                CH_SUBM_PROVIDER_ID = col_double(),
                BNFT_TYPE_CD = col_character(),
                CH_REND_AMT = col_double())
    
    provider_columns <- 
      cols_only(provider_id = col_double(),
                provider_type = col_character(),
                benefit_description = col_character())
    
    left_join(
      read_csv("/home/data/CLAIM_HISTORY_YEAR.csv") %>%
        filter(CLAIM_STATUS_TYPE_CD == "PC"),
      read_csv("/home/data/PROVIDER.csv"),
      by = c("CH_SUBM_PROVIDER_ID" = "provider_id"))
      
    end <- Sys.time()
    end - start
    

    The task took 40.19741 mins to execute.

    # Task 2 - group_by + count
    start <- Sys.time()
    
    claims_columns <-
      cols_only(CLAIM_STATUS_TYPE_CD = col_character(),
                CH_SUBM_PROVIDER_ID = col_double(),
                BNFT_TYPE_CD = col_character(),
                CH_REND_AMT = col_double(),
                PROCESS_MONTH = col_double())
    
    read_csv("/home/data/CLAIM_HISTORY_YEAR.csv") %>%
      filter(CLAIM_STATUS_TYPE_CD == "PC") %>%
      group_by(BNFT_TYPE_CD, PROCESS_MONTH) %>%
      count() %>%
      ungroup() %>%
      arrange(desc(n))
      
    end <- Sys.time()
    end - start
    

    The task took 38.88081 mins to execute.

    # Task 3 - group_by + summarize
    start <- Sys.time()
    claims_columns <-
      cols_only(CLAIM_STATUS_TYPE_CD = col_character(),
                CH_SUBM_PROVIDER_ID = col_double(),
                BNFT_TYPE_CD = col_character(),
                CH_REND_AMT = col_double(),
                PROCESS_MONTH = col_double())
    
    provider_columns <-
      cols_only(provider_id = col_double(),
                provider_type = col_character(),
                benefit_description = col_character())
    
    read_csv("/home/data/CLAIM_HISTORY_YEAR.csv") %>%
      filter(CLAIM_STATUS_TYPE_CD == "PC") %>%
      group_by(BNFT_TYPE_CD, PROCESS_MONTH) %>%
      summarize(minimum_amount =
                  min(CH_REND_AMT, na.rm = TRUE),
                mean_amount =
                  mean(CH_REND_AMT, na.rm = TRUE),
                max_amount =
                  max(CH_REND_AMT, na.rm = TRUE)) %>%
      ungroup()
      
    end <- Sys.time()
    end - start
    

    The task took 37.73755 mins to execute.

    # Task 4 - join + group_by + summarize
    start <- Sys.time()
    
    claims_columns <-
      cols_only(CLAIM_STATUS_TYPE_CD = col_character(),
                CH_SUBM_PROVIDER_ID = col_double(),
                BNFT_TYPE_CD = col_character(),
                CH_REND_AMT = col_double(),
                PROCESS_MONTH = col_double())
    
    provider_columns <-
      cols_only(provider_id = col_double(),
                provider_type = col_character(),
                benefit_description = col_character())
    
    left_join(
      read_csv("/home/data/CLAIM_HISTORY_YEAR.csv") %>%
        filter(CLAIM_STATUS_TYPE_CD == "PC"),
      read_csv("/home/data/PROVIDER.csv"),
      by = c("CH_SUBM_PROVIDER_ID" = "provider_id")) %>%
      group_by(benefit_description,
               BNFT_TYPE_CD, 
               PROCESS_MONTH) %>%
      summarize(minimum_amount =
                  min(CH_REND_AMT, na.rm = TRUE),
                mean_amount = 
                  mean(CH_REND_AMT, na.rm = TRUE),
                max_amount = 
                  max(CH_REND_AMT, na.rm = TRUE)) %>%
      ungroup()
      
    end <- Sys.time()
    end - start
    

    The task took 40.0343 mins to execute.

    Parquet Data

    The parquet file used in this section was 7.5 GB in size.

    # Task 1 - join
    start <- Sys.time()
    
    left_join(
      open_dataset(
        source = "/home/data/CLAIM_HISTORY_YEAR"
      ) %>%
        filter(CLAIM_STATUS_TYPE_CD == "PC") %>%
        select(CH_SUBM_PROVIDER_ID, BNFT_TYPE_CD, CH_REND_AMT),
      open_dataset(sources = "/home/data/Provider") %>%
        select(provider_id,
               provider_type, 
               benefit_description),
      by = c("CH_SUBM_PROVIDER_ID" = "provider_id")) %>%
      collect()
      
    end <- Sys.time()
    end - start
    

    The task took 4.153103 secs to execute.

    # Task 2 - group_by + count
    start <- Sys.time()
    
    open_dataset(source = "/home/data/CLAIM_HISTORY_YEAR") %>%
      filter(CLAIM_STATUS_TYPE_CD == "PC") %>%
      select(BNFT_TYPE_CD, PROCESS_MONTH) %>%
      group_by(BNFT_TYPE_CD, PROCESS_MONTH) %>%
      count() %>%
      ungroup() %>%
      arrange(desc(n)) %>%
      collect()
      
    end <- Sys.time()
    end - start
    

    The task took 0.844259 secs to execute.

    # Task 3 - group_by + summarize
    start <- Sys.time()
    
    open_dataset(source = "/home/data/CLAIM_HISTORY_YEAR") %>%
      filter(CLAIM_STATUS_TYPE_CD == "PC") %>%
      select(BNFT_TYPE_CD, CH_REND_AMT, PROCESS_MONTH) %>%
      group_by(BNFT_TYPE_CD, PROCESS_MONTH) %>%
      summarize(minimum_amount =
                  min(CH_REND_AMT, na.rm = TRUE),
                mean_amount = 
                  mean(CH_REND_AMT, na.rm = TRUE),
                max_amount = 
                  max(CH_REND_AMT, na.rm = TRUE)) %>%
      ungroup() %>%
      collect()
      
    end <- Sys.time()
    end - start
    

    The task took 1.010546 secs to execute.

    # Task 4 - join + group_by + summarize
    start <- Sys.time()
    
    left_join(
      open_dataset(
        source = "/home/data/CLAIM_HISTORY_YEAR"
      ) %>%
        filter(CLAIM_STATUS_TYPE_CD == "PC") %>%
        select(CH_SUBM_PROVIDER_ID, 
               BNFT_TYPE_CD,
               CH_REND_AMT, 
               PROCESS_MONTH),
      open_dataset(sources = "/home/data/Provider") %>%
        select(provider_id,
               provider_type, 
               benefit_description),
      by = c("CH_SUBM_PROVIDER_ID" = "provider_id")) %>%
      group_by(benefit_description,
               BNFT_TYPE_CD, 
               PROCESS_MONTH) %>%
      summarize(minimum_amount = 
                  min(CH_REND_AMT, na.rm = TRUE),
                mean_amount = 
                  mean(CH_REND_AMT, na.rm = TRUE),
                max_amount = 
                  max(CH_REND_AMT, na.rm = TRUE)) %>%
      ungroup() %>%
      collect()
      
    end <- Sys.time()
    end - start
    

    The task took 3.062172 secs to execute.

    Large Data

    The large data consists of data collected between 2014 and 2022. The dataset has 408,197,137 rows of data.

    CSV Data

    The CSV file used in this section was 351.5 GB in size.

    # Task 1 - join
    start <- Sys.time()
    
    claims_columns <- 
      cols_only(CLAIM_STATUS_TYPE_CD = col_character(),
                CH_SUBM_PROVIDER_ID = col_double(),
                BNFT_TYPE_CD = col_character(),
                CH_REND_AMT = col_double())
    
    provider_columns <- 
      cols_only(provider_id = col_double(),
                provider_type = col_character(),
                benefit_description = col_character())
    
    left_join(
      read_csv("/home/data/CLAIM_HISTORY_DECADE.csv") %>%
        filter(CLAIM_STATUS_TYPE_CD == "PC"),
      read_csv("/home/data/PROVIDER.csv"),
      by = c("CH_SUBM_PROVIDER_ID" = "provider_id"))
      
    end <- Sys.time()
    end - start
    

    The task did not complete, producing Error: std::bad_alloc.

    # Task 2 - group_by + count
    start <- Sys.time()
    claims_columns <- 
      cols_only(CLAIM_STATUS_TYPE_CD = col_character(),
                CH_SUBM_PROVIDER_ID = col_double(),
                BNFT_TYPE_CD = col_character(),
                CH_REND_AMT = col_double(),
                PROCESS_YEAR = col_double(),
                PROCESS_MONTH = col_double())
    
    read_csv("/home/data/CLAIM_HISTORY_DECADE.csv") %>%
      filter(CLAIM_STATUS_TYPE_CD == "PC") %>%
      group_by(BNFT_TYPE_CD, PROCESS_YEAR, PROCESS_MONTH) %>%
      count() %>%
      ungroup() %>%
      arrange(desc(n))
      
    end <- Sys.time()
    end - start
    

    The task did not complete, producing Error: std::bad_alloc.

    # Task 3 - group_by + summarize
    start <- Sys.time()
    
    claims_columns <-
      cols_only(CLAIM_STATUS_TYPE_CD = col_character(),
                CH_SUBM_PROVIDER_ID = col_double(),
                BNFT_TYPE_CD = col_character(),
                CH_REND_AMT = col_double(),
                PROCESS_YEAR = col_double(),
                PROCESS_MONTH = col_double())
    
    provider_columns <- 
      cols_only(provider_id = col_double(),
                provider_type = col_character(),
                benefit_description = col_character())
    
    read_csv("/home/data/CLAIM_HISTORY_DECADE.csv") %>%
      filter(CLAIM_STATUS_TYPE_CD == "PC") %>%
      group_by(BNFT_TYPE_CD, PROCESS_YEAR, PROCESS_MONTH) %>%
      summarize(minimum_amount = 
                  min(CH_REND_AMT, na.rm = TRUE),
                mean_amount = 
                  mean(CH_REND_AMT, na.rm = TRUE),
                max_amount = 
                  max(CH_REND_AMT, na.rm = TRUE)) %>%
      ungroup()
      
    end <- Sys.time()
    end - start
    

    The task did not complete, producing Error: std::bad_alloc.

    # Task 4 - join + group_by + summarize
    start <- Sys.time()
    
    claims_columns <-
      cols_only(CLAIM_STATUS_TYPE_CD = col_character(),
                CH_SUBM_PROVIDER_ID = col_double(),
                BNFT_TYPE_CD = col_character(),
                CH_REND_AMT = col_double(),
                PROCESS_YEAR = col_double(),
                PROCESS_MONTH = col_double())
    
    provider_columns <- 
      cols_only(provider_id = col_double(),
                provider_type = col_character(),
                benefit_description = col_character())
    
    left_join(
      read_csv("/home/data/CLAIM_HISTORY_DECADE.csv") %>%
        filter(CLAIM_STATUS_TYPE_CD == "PC"),
      read_csv("/home/data/PROVIDER.csv"),
      by = c("CH_SUBM_PROVIDER_ID" = "provider_id")) %>%
      group_by(benefit_description, 
               BNFT_TYPE_CD, 
               PROCESS_YEAR, 
               PROCESS_MONTH) %>%
      summarize(minimum_amount = 
                  min(CH_REND_AMT, na.rm = TRUE),
                mean_amount = 
                  mean(CH_REND_AMT, na.rm = TRUE),
                max_amount = 
                  max(CH_REND_AMT, na.rm = TRUE)) %>%
      ungroup()
      
    end <- Sys.time()
    end - start
    

    The task did not complete, producing Error: std::bad_alloc.

    Parquet Data

    The parquet file used in this section was 45.1 GB in size.

    # Task 1 - join
    start <- Sys.time()
    
    left_join(
      open_dataset(
        source = "/home/data/CLAIM_HISTORY_DECADE"
      ) %>%
        filter(CLAIM_STATUS_TYPE_CD == "PC") %>%
        select(CH_SUBM_PROVIDER_ID, BNFT_TYPE_CD, CH_REND_AMT),
      open_dataset(sources = "/home/data/Provider") %>%
        select(provider_id, 
               provider_type, 
               benefit_description),
      by = c("CH_SUBM_PROVIDER_ID" = "provider_id")) %>%
      collect()
      
    end <- Sys.time()
    end - start
    

    The task took 16.42989 secs to execute.

    # Task 2 - group_by + count
    start <- Sys.time()
    
    open_dataset(
      source = "/home/data/CLAIM_HISTORY_DECADE"
    ) %>%
      filter(CLAIM_STATUS_TYPE_CD == "PC") %>%
      select(BNFT_TYPE_CD) %>%
      group_by(BNFT_TYPE_CD) %>%
      count() %>%
      ungroup() %>%
      arrange(desc(n)) %>%
      collect()
      
    end <- Sys.time()
    end - start
    

    The task took 4.389257 secs to execute.

    # Task 3 - group_by + summarize
    start <- Sys.time()
    
    open_dataset(
      source = "/home/data/CLAIM_HISTORY_DECADE"
    ) %>%
      filter(CLAIM_STATUS_TYPE_CD == "PC") %>%
      select(BNFT_TYPE_CD, CH_REND_AMT) %>%
      group_by(BNFT_TYPE_CD) %>%
      summarize(minimum_amount = 
                  min(CH_REND_AMT, na.rm = TRUE),
                mean_amount = 
                  mean(CH_REND_AMT, na.rm = TRUE),
                max_amount = 
                  max(CH_REND_AMT, na.rm = TRUE)) %>%
      ungroup() %>%
      collect()
      
    end <- Sys.time()
    end - start
    

    The task took 4.441824 secs to execute.

    # Task 4 - join + group_by + summarize
    start <- Sys.time()
    
    left_join(
      open_dataset(
        source = "/home/data/CLAIM_HISTORY_DECADE"
      ) %>%
        filter(CLAIM_STATUS_TYPE_CD == "PC") %>%
        select(CH_SUBM_PROVIDER_ID, BNFT_TYPE_CD, CH_REND_AMT),
      open_dataset(sources = "/home/data/Provider") %>%
        select(provider_id,
               provider_type, 
               benefit_description),
      by = c("CH_SUBM_PROVIDER_ID" = "provider_id")) %>%
      group_by(benefit_description, BNFT_TYPE_CD) %>%
      summarize(minimum_amount =
                  min(CH_REND_AMT, na.rm = TRUE),
                mean_amount = 
                  mean(CH_REND_AMT, na.rm = TRUE),
                max_amount = 
                  max(CH_REND_AMT, na.rm = TRUE)) %>%
      ungroup() %>%
      collect()
      
    end <- Sys.time()
    end - start
    

    The task took 14.93252 secs to execute.

    Our Findings

    The results from our analysis were remarkable. Converting our data from row-based to columnar in parquet format significantly improved processing time. Processes that would take tens of minutes to an hour are now possible within seconds…game changer! The parquet format is a low/no-cost solution that provides immediate analytical improvements for both our data engineering and data analytics teams.

    Processing Time

    CSV processing time varied from 10.85 seconds to 2,411.84 seconds (40.2 minutes), whereas parquet file processing time ranged from 0.3 seconds to 16.43 seconds for all four dataset size groups. Note that the CSV large dataset errored (Error: std::bad_alloc) and did not complete. The Error: std::bad_alloc is synomenis with out-of-memory, yes insufficient memory even with our 1TB workstation!

    Improvement Factor

    Not only did our processing efficiency improve across all categories of sizes of data, storage size efficiency of the same datasets is also not to be overlooked. Being able to compute common analytical querying quicker and with a smaller footprint is an unrefutable win. Optimization in both size and speed is an attainable innovation for any Data Engineer/Analyst that is quantifiable and beneficial for any organization.

    The following illustrates the improvement factor (aka the number of times improvement using parquet provides over csv) for each of the four tasks, as well as the storage size improvements obtained using columnar storage.

    Processing Improvements with Parquet Files
    Dataset Size Group Task CSV Processing Time (in seconds) Parquet Processing Time (in seconds) Parquet Improvement Factor
    x-small join 16.01 1.78 9
    x-small group_by + count 10.85 0.75 15
    x-small group_by + summarize 11.86 0.30 40
    x-small join + group_by + summarize 16.03 1.36 12
    small join 220.62 1.60 138
    small group_by + count 189.71 0.30 629
    small group_by + summarize 185.72 0.51 361
    small join + group_by + summarize 206.88 1.13 184
    medium join 2,411.84 4.15 581
    medium group_by + count 2,332.85 0.84 2,763
    medium group_by + summarize 2,264.25 1.01 2,241
    medium join + group_by + summarize 2,402.06 3.06 784
    large join NA 16.43 NA
    large group_by + count NA 4.39 NA
    large group_by + summarize NA 4.44 NA
    large join + group_by + summarize NA 14.93 NA
    CSV large dataset did not complete
    Producing Error: std::bad_alloc

    Average Processing Improvements with Parquet Files
    By Dataset Group Size
    Dataset Size Group Average Parquet Improvement Factor
    x-small 19
    small 328
    medium 1,592
    large NA
    CSV large dataset did not complete
    Producing Error: std::bad_alloc

    Closing Remarks

    The time it takes to process data impacts all users, data engineers, data analytics, data scientists, decision makers, business users, and clients. Reducing processing time will improve the experience for all users along the data journey. Parquet files allow for analytical teams to reduce their analytical time significantly, be that data engineering, modelling, or data analytics. With parquet not requiring all the data to be read into memory prior to analysis, the file format provides an option for all organizations, regardless of their existing data infrastructure investment.

    Analytics looks to provide value to business; many times it focuses on improving efficiencies of models or adding new technology. Sometimes we can get significant improvements that pay value to business with simple solutions, like changing data storage formats. Boring yes, but 1,500 times faster processing is super!

    To leave a comment for the author, please follow the link and comment on their blog: RStudio | Open source & professional software for data science teams on RStudio.

    R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
    Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.