Doris初步使用2
使用命令查看创建表语句:
HELP CREATE TABLE;
创建表完整语句:
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [database.]table_name
(column_definition1[, column_definition2, ...]
[, index_definition1[, ndex_definition12,]])
[ENGINE = [olap|mysql|broker]]
[key_desc]
[COMMENT "table comment"];
[partition_desc]
[distribution_desc]
[rollup_index]
[PROPERTIES ("key"="value", ...)]
[BROKER PROPERTIES ("key"="value", ...)]
1)创建表 在某些多维分析场景下,数据既没有主键,也没有聚合需求。因此,我们引入 Duplicate 数据模型来满足这类需求
DUPLICATE KEY 是按字段排序:
CREATE TABLE IF NOT EXISTS example_db.duplicate_table
(
aid INT NOT NULL COMMENT "文章id",
title VARCHAR(32) COMMENT "标题",
content VARCHAR(32) COMMENT "评论",
uid VARCHAR(32) COMMENT "用户id",
tid VARCHAR(32) COMMENT "文章类型"
)
DUPLICATE KEY(aid)
DISTRIBUTED BY HASH(aid) BUCKETS 5
PROPERTIES("replication_num" = "1");
----在zeppelin里面执行 :
%doris
CREATE TABLE IF NOT EXISTS example_db.duplicate_table2
(
aid INT NOT NULL COMMENT "文章id",
title VARCHAR(32) COMMENT "标题",
content VARCHAR(32) COMMENT "评论",
uid VARCHAR(32) COMMENT "用户id",
tid VARCHAR(32) COMMENT "文章类型"
)
DUPLICATE KEY(aid)
DISTRIBUTED BY HASH(aid) BUCKETS 5
PROPERTIES("replication_num" = "1");
---创建成功之后我们可以通过mysql 客户端查看到表跟信息。目前表是空的
---我们需要导入数据,通过kafka实时导入,格式为json
---具体的细节信息通过 HELP ROUTINE LOAD;
CREATE ROUTINE LOAD example_db.duplicate_insert2 ON duplicate_table2
COLUMNS(aid,title,content,uid,tid)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "192.168.12.188:9092",
"kafka_topic" = "doris_article2",
"kafka_partitions" = "0",
"kafka_offsets" = "OFFSET_BEGINNING"
);
---执行语句,查看过程:
SHOW ROUTINE LOAD;
| 10096 | duplicate_insert | 2020-08-14 11:49:50 | N/A | N/A | default_cluster:example_db | duplicate_table | RUNNING | KAFKA | 1 | {"partitions":"*","columnToColumnExpr":"aid,title,content,uid,tid","maxBatchIntervalS":"20","whereExpr":"*","dataFormat":"json","maxBatchSizeBytes":"209715200","maxErrorNum":"0","currentTaskConcurrentNum":"1","maxBatchRows":"300000"} | {"topic":"doris_article","currentKafkaPartitions":"0","brokerList":"192.168.12.188:9092"} | {} | {"receivedBytes":0,"errorRows":0,"committedTaskNum":0,"loadedRows":0,"loadRowsRate":0,"abortedTaskNum":0,"totalRows":0,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":1}
---查看mysql
没数据 报错了:
STOP ROUTINE LOAD FOR duplicate_insert; 杀掉
---具体的细节信息通过 HELP ROUTINE LOAD;
----------之前正确案例
%doris
CREATE TABLE example_db.testing_table
(
siteid INT DEFAULT '10',
username VARCHAR(32) DEFAULT '',
pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(siteid, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");
%doris
CREATE ROUTINE LOAD example_db.test_json_label ON testing_table
COLUMNS(category,price,author)
PROPERTIES
(
"desired_concurrent_number"="1",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "192.168.6.34:9092,192.168.6.35:9092,192.168.6.36:9092",
"kafka_topic" = "doris_topic",
"kafka_partitions" = "0",
"kafka_offsets" = "0"
);
---再次测试 duplicate 模型
---案例写法
CREATE TABLE example_db.duplicate_table
(
`siteid` INT DEFAULT '10',
`username` VARCHAR(32) DEFAULT '',
`error_code` INT COMMENT "错误码"
)
DUPLICATE KEY(`siteid`, `username`)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");
CREATE ROUTINE LOAD example_db.duplicate_load ON duplicate_table
COLUMNS(`siteid`,`username`,`error_code`)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "192.168.12.188:9092",
"kafka_topic" = "test",
"kafka_partitions" = "0",
"kafka_offsets" = "0"
);
---正常写法
CREATE TABLE example_db.duplicate_table2
(
siteid INT DEFAULT '10',
username VARCHAR(32) DEFAULT '',
error_code INT COMMENT "错误码"
)
DUPLICATE KEY(siteid, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");
CREATE ROUTINE LOAD example_db.aaa ON duplicate_table2
COLUMNS(siteid,username,error_code)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "192.168.12.188:9092",
"kafka_topic" = "ods_kafka",
"kafka_partitions" = "0",
"kafka_offsets" = "0"
);
----3个表案例,这里使用的是DUPLICATE模型,存入是的原始数据:
--article 表
CREATE TABLE IF NOT EXISTS example_db.duplicate_table_01
(
aid INT NOT NULL COMMENT "文章id",
title VARCHAR(32) COMMENT "标题",
content VARCHAR(32) COMMENT "评论",
uid VARCHAR(32) COMMENT "用户id",
tid VARCHAR(32) COMMENT "文章类型"
)
DUPLICATE KEY(aid,title)
DISTRIBUTED BY HASH(aid) BUCKETS 5
PROPERTIES("replication_num" = "1");
CREATE ROUTINE LOAD example_db.article ON duplicate_table_01
COLUMNS(aid,title,content,uid,tid)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "192.168.12.188:9092",
"kafka_topic" = "doris_article_test1",
"kafka_partitions" = "0",
"kafka_offsets" = "0"
);
--user 表
CREATE TABLE IF NOT EXISTS example_db.duplicate_table_02
(
uid VARCHAR(32) COMMENT "用户id",
username VARCHAR(32) COMMENT "用户",
email VARCHAR(32) COMMENT "文章类型"
)
DUPLICATE KEY(uid,username)
DISTRIBUTED BY HASH(uid) BUCKETS 5
PROPERTIES("replication_num" = "1");
CREATE ROUTINE LOAD example_db.user ON duplicate_table_02
COLUMNS(uid,username,email)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "192.168.12.188:9092",
"kafka_topic" = "doris_user_test1",
"kafka_partitions" = "0",
"kafka_offsets" = "0"
);
--user 表
CREATE TABLE IF NOT EXISTS example_db.duplicate_table_03
(
tid VARCHAR(32) COMMENT "tid id",
typename VARCHAR(32) COMMENT "类型名称"
)
DUPLICATE KEY(tid,typename)
DISTRIBUTED BY HASH(tid) BUCKETS 5
PROPERTIES("replication_num" = "1");
CREATE ROUTINE LOAD example_db.type ON duplicate_table_03
COLUMNS(tid,typename)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "192.168.12.188:9092",
"kafka_topic" = "doris_type_test1",
"kafka_partitions" = "0",
"kafka_offsets" = "0"
);
--type2 表
CREATE TABLE IF NOT EXISTS example_db.duplicate_table_04
(
t_id VARCHAR(32) COMMENT "tid id",
typename VARCHAR(32) COMMENT "类型名称"
)
DUPLICATE KEY(t_id,typename)
DISTRIBUTED BY HASH(t_id) BUCKETS 5
PROPERTIES("replication_num" = "1");
CREATE ROUTINE LOAD example_db.type2 ON duplicate_table_04
COLUMNS(t_id,typename)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "192.168.12.188:9092",
"kafka_topic" = "doris_type2_test1",
"kafka_partitions" = "0",
"kafka_offsets" = "0"
);
--type3 表
CREATE TABLE IF NOT EXISTS example_db.duplicate_table_05
(
tt_id VARCHAR(32) COMMENT "tid id",
typename VARCHAR(32) COMMENT "类型名称"
)
DUPLICATE KEY(tt_id,typename)
DISTRIBUTED BY HASH(tt_id) BUCKETS 5
PROPERTIES("replication_num" = "1");
CREATE ROUTINE LOAD example_db.type3 ON duplicate_table_05
COLUMNS(tt_id,typename)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "192.168.12.188:9092",
"kafka_topic" = "doris_type3_test1",
"kafka_partitions" = "0",
"kafka_offsets" = "0"
);
--表结构变更
--案例:针对testing_table增加列
ALTER TABLE testing_table ADD COLUMN uv BIGINT SUM DEFAULT '0' after pv;
SHOW ALTER TABLE COLUMN; ---查看进度:
---rollup 物化索引
ALTER TABLE testing_table ADD ROLLUP rollup_id_name(siteid, username);
SHOW ALTER TABLE ROLLUP; ---查看rollup
Rollup 建立完成之后可以使用 DESC table_name ALL 查看表的 Rollup 信息。
DESC testing_table ALL;
可以使用以下命令取消当前正在执行的作业:
CANCEL ALTER TABLE ROLLUP FROM testing_table;
------两个表join 测试 :
---join 分为两种:Broadcast/Shuffle Join
SELECT duplicate_table_03.*,duplicate_table_04.* FROM duplicate_table_03 JOIN duplicate_table_04 WHERE duplicate_table_03.tid = duplicate_table_04.t_id;
---使用[broadcast]
SELECT duplicate_table_03.*,duplicate_table_04.* FROM duplicate_table_03 JOIN [broadcast] duplicate_table_04 WHERE duplicate_table_03.tid = duplicate_table_04.t_id;
---使用 [shuffle] Join:
SELECT duplicate_table_03.*,duplicate_table_04.* FROM duplicate_table_03 JOIN [broadcast] duplicate_table_04 WHERE duplicate_table_03.tid = duplicate_table_04.t_id;
---使用left join 基本语法都支持
SELECT duplicate_table_03.*,duplicate_table_04.* FROM duplicate_table_03 left JOIN [shuffle] duplicate_table_04 on duplicate_table_03.tid = duplicate_table_04.t_id;
---多表join
SELECT duplicate_table_03.*,duplicate_table_04.*,duplicate_table_05.*
FROM duplicate_table_03
JOIN duplicate_table_04
ON duplicate_table_03.tid=duplicate_table_04.t_id
JOIN duplicate_table_05
ON duplicate_table_04.t_id=duplicate_table_05.tt_id