ClickHouse یک پایگاه داده OLAP متنباز با عملکرد بسیار سریع است که برای تحلیلهای حجیم و دادههای زمانی طراحی شده است. یکی از ویژگیهای کلیدی ClickHouse، توانایی توزیع دادهها و اجرای کوئریها روی چند نود بهصورت موازی است. در این مقاله، به صورت عملی با این مکانیزم آشنا میشویم و نحوه ایجاد یک کلاستر سه نودی با Docker و استفاده از جدول Distributed را مرور میکنیم.
در ClickHouse دادهها میتوانند روی چند نود (Node) توزیع شوند تا علاوه بر افزایش ظرفیت ذخیرهسازی، زمان پاسخدهی کوئریها کاهش یابد. دو مفهوم کلیدی در این مکانیزم وجود دارد:
در این معماری، هر نود جدولهای MergeTree خود را دارد و دادهها روی شاردهای مختلف ذخیره میشوند.
برای اجرای کوئریها روی کلاستر، ClickHouse از جدول Distributed استفاده میکند:
برای تمرین عملی، میتوانیم یک کلاستر سه نودی با Docker و Docker Compose بسازیم. هر نود شامل:
version: '3.9'
services:
chnode1:
build: .
ports:
- "۸۱۲۳:۸۱۲۳"
- "۹۰۰۰:۹۰۰۰"
- "۹۱۸۱:۹۱۸۱"
- "۹۲۳۴:۹۲۳۴"
volumes:
- ./configs/config1:/etc/clickhouse-server/config.d
- ./logs/logs_1:/var/log/clickhouse-server/
- ./data:/var/lib/clickhouse/user_files
- ch1_data:/var/lib/clickhouse
networks:
- clickhouse_network
mem_limit: 2G
cpus: 1
chnode2:
build: .
ports:
- "۸۱۲۴:۸۱۲۳"
- "۹۰۰۱:۹۰۰۰"
- "۹۱۸۲:۹۱۸۱"
- "۹۲۳۵:۹۲۳۴"
volumes:
- ./configs/config2:/etc/clickhouse-server/config.d
- ./logs/logs_2:/var/log/clickhouse-server/
- ./data:/var/lib/clickhouse/user_files
- ch2_data:/var/lib/clickhouse
networks:
- clickhouse_network
mem_limit: 2G
cpus: 1
chnode3:
build: .
ports:
- "۸۱۲۵:۸۱۲۳"
- "۹۰۰۲:۹۰۰۰"
- "۹۱۸۳:۹۱۸۱"
- "۹۲۳۶:۹۲۳۴"
volumes:
- ./configs/config3:/etc/clickhouse-server/config.d
- ./logs/logs_3:/var/log/clickhouse-server/
- ./data:/var/lib/clickhouse/user_files
- ch3_data:/var/lib/clickhouse
networks:
- clickhouse_network
mem_limit: 2G
cpus: 1
networks:
clickhouse_network:
driver: bridge
volumes:
ch1_data:
ch2_data:
ch3_data:
همانطور که مشاهده میکنید، هر نود یک پوشه کانفیگ مجزا دارد (
configs/config1,configs/config2,configs/config3) که باعث میشود هر نود بتواند کانفیگ مخصوص خود را داشته باشد.
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<raft_configuration>
<server><id>1</id><hostname>chnode1</hostname><port>9234</port></server>
<server><id>2</id><hostname>chnode2</hostname><port>9234</port></server>
<server><id>3</id><hostname>chnode3</hostname><port>9234</port></server>
</raft_configuration>
</keeper_server>
</clickhouse>
این مقادیر برای هر نود متفاوت است و از طریق پوشه کانفیگ مخصوص همان نود ست میشوند.
<clickhouse>
<macros>
<shard>1</shard>
<replica>replica_1</replica>
</macros>
</clickhouse>
ساختار کلاستر با تعیین تعداد شاردها و رپلیکا و اینکه هر شارد و هر رپلیکا در کدام نود قرار گرفته اند، در این فایل تنظیم می شود و بین همه نودها مشترک است.
<clickhouse>
<remote_servers replace="true">
<cluster_3S_1R>
<shard>
<internal_replication>true</internal_replication>
<replica><host>chnode1</host><port>9000</port></replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica><host>chnode2</host><port>9000</port></replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica><host>chnode3</host><port>9000</port></replica>
</shard>
</cluster_3S_1R>
</remote_servers>
</clickhouse>
<clickhouse>
<zookeeper>
<node index="1"><host>chnode1</host><port>9181</port></node>
<node index="2"><host>chnode2</host><port>9181</port></node>
<node index="3"><host>chnode3</host><port>9181</port></node>
</zookeeper>
</clickhouse>
ابتدا یک جدول MergeTree روی هر نود ایجاد میکنیم (مثلاً trips_part) و دادهها را روی آن میریزیم:
CREATE TABLE rides.trips_part ON CLUSTER 'cluster_3S_1R' (
VendorID Int32,
tpep_pickup_datetime DateTime64(6),
...
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(tpep_pickup_datetime)
ORDER BY (tpep_pickup_datetime, PULocationID, DOLocationID)
SETTINGS index_granularity = 8192;
سپس یک جدول Distributed ایجاد میکنیم که کوئریها را روی نودها توزیع کند:
CREATE TABLE rides.trips_distributed ON CLUSTER 'cluster_3S_1R'
AS rides.trips_part
ENGINE = Distributed('cluster_3S_1R', 'rides', 'trips_part', toMonth(tpep_pickup_datetime) % 3);
همانطور که گفته شد، جدول Distributed هیچ دادهای ذخیره نمیکند و صرفاً کوئریها را هدایت و نتایج را جمعآوری میکند. در تعریف جدول توزیع شده نیاز به نام کلاستر، نام دیتابیس و نام جداول محلی و همچنین کلید شارد دارد که به ازای هر رکورد، اگر بخواهیم از طریق این جدول، درج انجام دهیم ، توزیع رکوردها بین نودها را از طریق آن انجام دهد. قبل از ساخت این جدول توزیع شده، این دیتابیس و جداول محلی باید ساخته شده باشند.
SELECT sum(total_amount), avg(total_amount), count(*) AS num_rows
FROM rides.trips_distributed
WHERE tpep_pickup_datetime >= '2023-05-01' AND tpep_pickup_datetime < '2023-06-01';
یکی از ویژگیهای کلیدی جدول Distributed، امکان توزیع دادهها هنگام درج (INSERT) روی نودهای مختلف بر اساس Shard Key است.
مثال جدول Distributed با Shard Key:
CREATE TABLE rides.trips_distributed ON CLUSTER 'cluster_3S_1R'
AS rides.trips_part
ENGINE = Distributed(
'cluster_3S_1R',
'rides',
'trips_part',
cityHash64(PULocationID) -- Shard Key
);
PULocationID به عنوان Shard Key استفاده شده است.cityHash64 مقدار ستون را هش میکند تا توزیع تقریبا یکنواخت دادهها بین نودها صورت گیرد. اینجا رکوردهای هر محله در یک شارد قرار میگیرند. به بیان ساده، Shard Key توزیع دادهها هنگام INSERT را کنترل میکند، اما هر تغییری در تعداد شاردها نیاز به بازتعریف جدول Distributed دارد تا معماری کلاستر همچنان صحیح باقی بماند.
از تغییر تعداد نودها بدون بازسازی Distributed Table پرهیز کنید، مگر اینکه بخواهید دادهها را مجدداً توزیع کنید.
هنگام طراحی Shard Key، ستونی را انتخاب کنید که پراکندگی دادهها بالایی دارد تا شاردها بهصورت متعادل بارگذاری شوند.