Skip to main content
Skip to main content
Edit this page

Best practices for migrating from BigQuery to ClickHouse Cloud

This document describes best practices for migrating from BigQuery to ClickHouse.

Pre-requisites

You should already be familiar with the ClickHouse core and data modeling concepts explored in:

Choosing a primary key

As in BigQuery, ClickHouse doesn't enforce uniqueness for a table's primary key column values.

Similar to clustering in BigQuery, a ClickHouse table's data is stored on disk ordered by the primary key columns. This sort order is utilized by the query optimizer to prevent resorting, minimize memory usage for joins, and enable short-circuiting for limit clauses. In contrast to BigQuery, ClickHouse automatically creates a (sparse) primary index based on the primary key column values. This index is used to speed up all queries that contain filters on the primary key columns. Specifically:

  • Memory and disk efficiency are paramount to the scale at which ClickHouse is often used. Data is written to ClickHouse tables in chunks known as parts, with rules applied for merging the parts in the background. In ClickHouse, each part has its own primary index. When parts are merged, then the merged part's primary indexes are also merged. Not that these indexes aren't built for each row. Instead, the primary index for a part has one index entry per group of rows - this technique is called sparse indexing.
  • Sparse indexing is possible because ClickHouse stores the rows for a part on disk ordered by a specified key. Instead of directly locating single rows (like a B-Tree-based index), the sparse primary index allows it to quickly (via a binary search over index entries) identify groups of rows that could possibly match the query. The located groups of potentially matching rows are then, in parallel, streamed into the ClickHouse engine to find the matches. This index design allows for the primary index to be small (it completely fits into the main memory) while still significantly speeding up query execution times, especially for range queries that are typical in data analytics use cases. For more details, we recommend this in-depth guide.

The selected primary key in ClickHouse will determine not only the index but also the order in which data is written on disk. Because of this, it can dramatically impact compression levels, which can, in turn, affect query performance. An ordering key that causes the values of most columns to be written in a contiguous order will allow the selected compression algorithm (and codecs) to compress the data more effectively.

All columns in a table will be sorted based on the value of the specified ordering key, regardless of whether they're included in the key itself. For instance, if CreationDate is used as the key, the order of values in all other columns will correspond to the order of values in the CreationDate column. Multiple ordering keys can be specified - this will order with the same semantics as an ORDER BY clause in a SELECT query.

Partitioning best practices

Coming from BigQuery, you'll be familiar with the concept of table partitioning for enhancing performance and manageability for large databases by dividing tables into smaller, more manageable pieces called partitions. Partitions allow administrators to organize data based on specific criteria like date ranges or geographical locations, and can be achieved using either a range on a specified column (e.g., dates), defined lists, or via a hash on a key.

Partitioning helps with improving query performance by enabling faster data access through partition pruning and more efficient indexing. It also helps maintenance tasks such as backups and data purges by allowing operations on individual partitions rather than the entire table. Additionally, partitioning can significantly improve the scalability of BigQuery databases by distributing the load across multiple partitions.

In ClickHouse, partitioning is specified on a table when it's initially defined via the PARTITION BY clause. This clause can contain a SQL expression on any column/s, the results of which will define which partition a row is sent to.

Data parts are logically associated with each partition on disk and can be queried in isolation. In the example below, the posts table gets partitioned by year using the expression toYear(CreationDate). As rows are inserted into ClickHouse, this expression will be evaluated against each row. Rows are then routed to the resulting partition in the form of new data parts belonging to that partition.

CREATE TABLE posts
(
    `Id` Int32 CODEC(Delta(4), ZSTD(1)),
    `PostTypeId` Enum8('Question' = 1, 'Answer' = 2, 'Wiki' = 3, 'TagWikiExcerpt' = 4, 'TagWiki' = 5, 'ModeratorNomination' = 6, 'WikiPlaceholder' = 7, 'PrivilegeWiki' = 8),
    `AcceptedAnswerId` UInt32,
    `CreationDate` DateTime64(3, 'UTC'),
...
    `ClosedDate` DateTime64(3, 'UTC')
)
ENGINE = MergeTree
ORDER BY (PostTypeId, toDate(CreationDate), CreationDate)
PARTITION BY toYear(CreationDate)

Partitioning as a data management feature

TL;DR

You should consider partitioning primarily a data management technique. It's ideal when data needs to be expired from the cluster when operating with time series data e.g. the oldest partition can simply be dropped.

Partitioning in ClickHouse has similar applications as in BigQuery, however in ClickHouse, you should principally consider partitioning to be a data management feature, not a query optimization technique. By separating data logically based on a key, each partition can be operated on independently. This is useful for example when you want to delete it using TTL. It also allows you to move partitions, and thus subsets, between storage tiers efficiently on time or expire data/efficiently delete from the cluster.

In the example below, posts from 2008 are removed:

SELECT DISTINCT partition
FROM system.parts
WHERE `table` = 'posts'

┌─partition─┐
│ 2008      │
│ 2009      │
│ 2010      │
│ 2011      │
│ 2012      │
│ 2013      │
│ 2014      │
│ 2015      │
│ 2016      │
│ 2017      │
│ 2018      │
│ 2019      │
│ 2020      │
│ 2021      │
│ 2022      │
│ 2023      │
│ 2024      │
└───────────┘

17 rows in set. Elapsed: 0.002 sec.

ALTER TABLE posts
(DROP PARTITION '2008')

Ok.

0 rows in set. Elapsed: 0.103 sec.

Partitioning as a query optimization technique

Use partitioning for query optimization with care

Ensure your partitioning key expression doesn't result in a high cardinality set i.e. creating more than 100 partitions should be avoided. For example, don't partition your data by high cardinality columns such as client identifiers or names. Instead, make a client identifier or name the first column in the ORDER BY expression.

While partitioning in ClickHouse should be considered primarily a data management technique, it can in some situations help with query performance. This depends heavily on the access patterns. If queries target only a few partitions (ideally a single partition), performance can potentially improve. This is only typically useful if the partitioning key isn't in the primary key, and you're filtering on it.

Note however that queries that need to cover many partitions may perform worse than if no partitioning is used. This is because there may be more parts created as a result of partitioning.

Internally, ClickHouse creates parts for inserted data. As more data is inserted, the number of parts increases. To prevent an excessively high number of parts, which will degrade query performance (because there are more files to read), parts are merged together in an asynchronous background process. If the number of parts exceeds a pre-configured limit, then ClickHouse will throw an exception on insert as a "too many parts" error. This shouldn't happen under normal operation and only occurs if ClickHouse is misconfigured or used incorrectly e.g. many small inserts. Since parts are created per partition in isolation, increasing the number of partitions causes the number of parts to increase i.e. it's a multiple of the number of partitions. High cardinality partitioning keys can therefore cause this error and should be avoided.

The benefit of targeting a single partition will be less pronounced, even non-existent, if the partitioning key is already an early entry in the primary key.

Partitioning can also be used to optimize GROUP BY queries if values in each partition are unique. In general, you should ensure your primary key is optimized, and only consider partitioning as a query optimization technique in exceptional cases where access patterns access a specific predictable subset of the day, e.g., partitioning by day, with most queries in the last day.

Using materialized views and projections

ClickHouse's concept of projections allows you to specify multiple ORDER BY clauses for a table.

In ClickHouse data modeling, we explore how materialized views can be used in ClickHouse to pre-compute aggregations, transform rows, and optimize queries for different access patterns. For the latter, we provided an example where the materialized view sends rows to a target table with a different ordering key to the original table receiving inserts.

For example, consider the following query:

SELECT avg(Score)
FROM comments
WHERE UserId = 8592047

   ┌──────────avg(Score)─┐
   │ 0.18181818181818182 │
   └─────────────────────┘
--highlight-next-line
1 row in set. Elapsed: 0.040 sec. Processed 90.38 million rows, 361.59 MB (2.25 billion rows/s., 9.01 GB/s.)
Peak memory usage: 201.93 MiB.

This query requires all 90m rows to be scanned (albeit quickly) as the UserId isn't the ordering key. Previously, we solved this using a materialized view acting as a lookup for the PostId.

The same problem can also be solved with a projection. The command below adds a projection with ORDER BY user_id.

ALTER TABLE comments ADD PROJECTION comments_user_id (
SELECT * ORDER BY UserId
)

ALTER TABLE comments MATERIALIZE PROJECTION comments_user_id

We first create the projection and then materialize it. This latter command causes the data to be stored twice on disk in two different orders.

The projection can also be defined when the data is created, and will be automatically maintained as data is inserted. This is shown below:

CREATE TABLE comments
(
    `Id` UInt32,
    `PostId` UInt32,
    `Score` UInt16,
    `Text` String,
    `CreationDate` DateTime64(3, 'UTC'),
    `UserId` Int32,
    `UserDisplayName` LowCardinality(String),
    -- highlight-start
    PROJECTION comments_user_id
    (
    SELECT *
    ORDER BY UserId
    )
    -- highlight-end
)
ENGINE = MergeTree
ORDER BY PostId

If the projection is created via an ALTER command, the creation is asynchronous when the MATERIALIZE PROJECTION command is issued. You can confirm the progress of this operation with the following query, waiting for is_done=1.

SELECT
    parts_to_do,
    is_done,
    latest_fail_reason
FROM system.mutations
WHERE (`table` = 'comments') AND (command LIKE '%MATERIALIZE%')

   ┌─parts_to_do─┬─is_done─┬─latest_fail_reason─┐
1. │           1 │       0 │                    │
   └─────────────┴─────────┴────────────────────┘

1 row in set. Elapsed: 0.003 sec.

Repeating the above query, you can see that performance has improved significantly at the expense of additional storage.

SELECT avg(Score)
FROM comments
WHERE UserId = 8592047

   ┌──────────avg(Score)─┐
1. │ 0.18181818181818182 │
   └─────────────────────┘
--highlight-next-line
1 row in set. Elapsed: 0.008 sec. Processed 16.36 thousand rows, 98.17 KB (2.15 million rows/s., 12.92 MB/s.)
Peak memory usage: 4.06 MiB.

With an EXPLAIN command, you can also confirm the projection was used to serve this query:

EXPLAIN indexes = 1
SELECT avg(Score)
FROM comments
WHERE UserId = 8592047

    ┌─explain─────────────────────────────────────────────┐
 1. │ Expression ((Projection + Before ORDER BY))         │
 2. │   Aggregating                                       │
 3. │   Filter                                            │
 4. │           ReadFromMergeTree (comments_user_id)      │
 5. │           Indexes:                                  │
 6. │           PrimaryKey                                │
 7. │           Keys:                                     │
 8. │           UserId                                    │
 9. │           Condition: (UserId in [8592047, 8592047]) │
10. │           Parts: 2/2                                │
11. │           Granules: 2/11360                         │
    └─────────────────────────────────────────────────────┘

11 rows in set. Elapsed: 0.004 sec.

When to use projections

Projections are an appealing feature for new users as they're automatically maintained as data is inserted. Furthermore, queries can just be sent to a single table where the projections are exploited where possible to speed up the response time.

This is in contrast to materialized views, where the user has to select the appropriate optimized target table or rewrite their query, depending on the filters. This places greater emphasis on user applications and increases client-side complexity.

Despite these advantages, projections come with some inherent limitations which you should be aware of and thus should be deployed sparingly. For further details see "materialized views versus projections"

We recommend using projections when:

  • A complete reordering of the data is required. While the expression in the projection can, in theory, use a GROUP BY, materialized views are more effective for maintaining aggregates. The query optimizer is also more likely to exploit projections that use a simple reordering, i.e., SELECT * ORDER BY x. You can select a subset of columns in this expression to reduce storage footprint.
  • Users are comfortable with the associated increase in storage footprint and overhead of writing data twice. Test the impact on insertion speed and evaluate the storage overhead.

Rewriting BigQuery queries in ClickHouse

The following provides example queries comparing BigQuery to ClickHouse. This list aims to demonstrate how to exploit ClickHouse features to significantly simplify queries. The examples here use the full Stack Overflow dataset (up to April 2024).

Users (with more than 10 questions) which receive the most views:

SELECT OwnerDisplayName, sum(ViewCount) AS total_views
FROM stackoverflow.posts
WHERE (PostTypeId = 1) AND (OwnerDisplayName != '')
GROUP BY OwnerDisplayName
HAVING count(*) > 10
ORDER BY total_views DESC
LIMIT 5
RowOwnerDisplayNametotal_views
1Joan Venge25520387
2Ray Vega21576470
3anon19814224
4Tim19028260
5John17638812

Which tags receive the most views:

SELECT
  tag AS tags,
  SUM(ViewCount) AS views
FROM (
  SELECT
    ViewCount,
    tag
  FROM
    stackoverflow.posts,
    UNNEST(SPLIT(Tags, '|')) AS tag
  WHERE tag != ''
)
GROUP BY tags
ORDER BY views DESC
LIMIT 5
Rowtagsviews
1javascript8190916894
2python8175132834
3java7258379211
4c#5476932513
5android4258320338

Aggregate functions

Where possible, you should exploit ClickHouse aggregate functions. Below, we show the use of the argMax function to compute the most viewed question of each year.

SELECT
  EXTRACT(YEAR FROM CreationDate) AS Year,
  ARRAY_AGG(Title ORDER BY ViewCount DESC LIMIT 1)[OFFSET(0)]
    AS MostViewedQuestionTitle,
  MAX(ViewCount) AS MaxViewCount
FROM
  `stackoverflow.posts`
WHERE
  PostTypeId = 1
GROUP BY
  Year
ORDER BY
  Year ASC
RowYearMostViewedQuestionTitleMaxViewCount
12008How to find the index for a given item in a list?6316987
22009How do I undo the most recent local commits in Git?13962748
............
162023How do I solve "error: externally-managed-environment" every time I use pip 3?506822
172024Warning "Third-party cookie will be blocked. Learn more in the Issues tab"66975

Conditionals and arrays

Conditional and array functions make queries significantly simpler. The following query computes the tags (with more than 10000 occurrences) with the largest percentage increase from 2022 to 2023. Note how the following ClickHouse query is succinct thanks to conditionals, array functions, and support for reusing aliases in the HAVING and SELECT clauses.

WITH posts_expanded AS (
  SELECT
    tag,
    EXTRACT(YEAR FROM CreationDate) AS year
  FROM
    stackoverflow.posts,
    UNNEST(SPLIT(Tags, '|')) AS tag
  WHERE
    tag != '' AND EXTRACT(YEAR FROM CreationDate) IN (2022, 2023)
),
tag_counts AS (
  SELECT
    tag,
    COUNTIF(year = 2023) AS count_2023,
    COUNTIF(year = 2022) AS count_2022
  FROM
    posts_expanded
  GROUP BY tag
)
SELECT
  tag,
  count_2023,
  count_2022,
  ((count_2023 - count_2022) / count_2022) * 100 AS percent_change
FROM
  tag_counts
WHERE
  count_2022 > 10000
  AND count_2023 > 10000
ORDER BY percent_change DESC
LIMIT 5
Rowtagcount_2023count_2022percent_change
1next.js137881052031.06463878326996
2spring-boot1657317721-6.478189718413183
3.net1145812968-11.644046884639112
4azure1199614049-14.613139725247349
5docker1388516877-17.72826924216389