Doris初步使用2


使用命令查看创建表语句:
HELP CREATE TABLE;

创建表完整语句:
    CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [database.]table_name
    (column_definition1[, column_definition2, ...]
    [, index_definition1[, ndex_definition12,]])
    [ENGINE = [olap|mysql|broker]]
    [key_desc]
    [COMMENT "table comment"];
    [partition_desc]
    [distribution_desc]
    [rollup_index]
    [PROPERTIES ("key"="value", ...)]
    [BROKER PROPERTIES ("key"="value", ...)] 
 
1)创建表 在某些多维分析场景下,数据既没有主键,也没有聚合需求。因此,我们引入 Duplicate 数据模型来满足这类需求
    DUPLICATE KEY 是按字段排序:
    
CREATE TABLE IF NOT EXISTS example_db.duplicate_table
(
    aid INT NOT NULL COMMENT "文章id",
    title VARCHAR(32) COMMENT "标题",
    content VARCHAR(32) COMMENT "评论",
    uid VARCHAR(32) COMMENT "用户id",
    tid VARCHAR(32) COMMENT "文章类型"
)
DUPLICATE KEY(aid)
DISTRIBUTED BY HASH(aid) BUCKETS 5
PROPERTIES("replication_num" = "1");

----在zeppelin里面执行 :
%doris
CREATE TABLE IF NOT EXISTS example_db.duplicate_table2
(
    aid INT NOT NULL COMMENT "文章id",
    title VARCHAR(32) COMMENT "标题",
    content VARCHAR(32) COMMENT "评论",
    uid VARCHAR(32) COMMENT "用户id",
    tid VARCHAR(32) COMMENT "文章类型"
)
DUPLICATE KEY(aid)
DISTRIBUTED BY HASH(aid) BUCKETS 5
PROPERTIES("replication_num" = "1");

---创建成功之后我们可以通过mysql 客户端查看到表跟信息。目前表是空的

---我们需要导入数据,通过kafka实时导入,格式为json
---具体的细节信息通过 HELP ROUTINE LOAD;

CREATE ROUTINE LOAD example_db.duplicate_insert2 ON duplicate_table2
        COLUMNS(aid,title,content,uid,tid)
        PROPERTIES
        (
        "desired_concurrent_number"="3",
        "max_batch_interval" = "20",
        "max_batch_rows" = "300000",
        "max_batch_size" = "209715200",
        "strict_mode" = "false",
        "format" = "json"
        )
           FROM KAFKA
        (
            "kafka_broker_list" = "192.168.12.188:9092",
            "kafka_topic" = "doris_article2",
            "kafka_partitions" = "0",
            "kafka_offsets" = "OFFSET_BEGINNING"
        );

Doris初步使用2

 

---执行语句,查看过程:
SHOW ROUTINE LOAD;

| 10096 | duplicate_insert | 2020-08-14 11:49:50 | N/A       | N/A     | default_cluster:example_db | duplicate_table | RUNNING | KAFKA          | 1              | {"partitions":"*","columnToColumnExpr":"aid,title,content,uid,tid","maxBatchIntervalS":"20","whereExpr":"*","dataFormat":"json","maxBatchSizeBytes":"209715200","maxErrorNum":"0","currentTaskConcurrentNum":"1","maxBatchRows":"300000"}   | {"topic":"doris_article","currentKafkaPartitions":"0","brokerList":"192.168.12.188:9092"} | {}               | {"receivedBytes":0,"errorRows":0,"committedTaskNum":0,"loadedRows":0,"loadRowsRate":0,"abortedTaskNum":0,"totalRows":0,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":1} 

---查看mysql
没数据 报错了:
STOP ROUTINE LOAD FOR duplicate_insert;  杀掉
        
---具体的细节信息通过 HELP ROUTINE LOAD;
 Doris初步使用2


----------之前正确案例

%doris
  CREATE TABLE example_db.testing_table
(
    siteid INT DEFAULT '10',
    username VARCHAR(32) DEFAULT '',
    pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(siteid, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");

%doris
CREATE ROUTINE LOAD example_db.test_json_label ON testing_table
        COLUMNS(category,price,author)
        PROPERTIES
        (
        "desired_concurrent_number"="1",
        "max_batch_interval" = "20",
        "max_batch_rows" = "300000",
        "max_batch_size" = "209715200",
        "strict_mode" = "false",
        "format" = "json"
        )
        FROM KAFKA
        (
        "kafka_broker_list" = "192.168.6.34:9092,192.168.6.35:9092,192.168.6.36:9092",
        "kafka_topic" = "doris_topic",
        "kafka_partitions" = "0",
        "kafka_offsets" = "0"
        );

---再次测试  duplicate 模型
 

 ---案例写法
 CREATE TABLE example_db.duplicate_table
(
    `siteid` INT DEFAULT '10',
    `username` VARCHAR(32) DEFAULT '',
    `error_code` INT COMMENT "错误码"
)
DUPLICATE KEY(`siteid`, `username`)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");

CREATE ROUTINE LOAD example_db.duplicate_load ON duplicate_table
    COLUMNS(`siteid`,`username`,`error_code`)
    PROPERTIES
    (
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    "strict_mode" = "false",
    "format" = "json"
    )
    FROM KAFKA
    (
    "kafka_broker_list" = "192.168.12.188:9092",
    "kafka_topic" = "test",
    "kafka_partitions" = "0",
    "kafka_offsets" = "0"
    );


  ---正常写法
 CREATE TABLE example_db.duplicate_table2
(
    siteid INT DEFAULT '10',
    username VARCHAR(32) DEFAULT '',
    error_code INT COMMENT "错误码"
)
DUPLICATE KEY(siteid, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");


CREATE ROUTINE LOAD example_db.aaa ON duplicate_table2
COLUMNS(siteid,username,error_code)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
    (
    "kafka_broker_list" = "192.168.12.188:9092",
    "kafka_topic" = "ods_kafka",
    "kafka_partitions" = "0",
    "kafka_offsets" = "0"
    );
    
----3个表案例,这里使用的是DUPLICATE模型,存入是的原始数据:

--article 表
CREATE TABLE IF NOT EXISTS example_db.duplicate_table_01
(
    aid INT NOT NULL COMMENT "文章id",
    title VARCHAR(32) COMMENT "标题",
    content VARCHAR(32) COMMENT "评论",
    uid VARCHAR(32) COMMENT "用户id",
    tid VARCHAR(32) COMMENT "文章类型"
)
DUPLICATE KEY(aid,title)
DISTRIBUTED BY HASH(aid) BUCKETS 5
PROPERTIES("replication_num" = "1");


CREATE ROUTINE LOAD example_db.article ON duplicate_table_01
COLUMNS(aid,title,content,uid,tid)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
    (
    "kafka_broker_list" = "192.168.12.188:9092",
    "kafka_topic" = "doris_article_test1",
    "kafka_partitions" = "0",
    "kafka_offsets" = "0"
    );
    
    
    
--user 表
CREATE TABLE IF NOT EXISTS example_db.duplicate_table_02
(
     
    uid VARCHAR(32) COMMENT "用户id",
    username VARCHAR(32) COMMENT "用户",
    email VARCHAR(32) COMMENT "文章类型"
)
DUPLICATE KEY(uid,username)
DISTRIBUTED BY HASH(uid) BUCKETS 5
PROPERTIES("replication_num" = "1");


CREATE ROUTINE LOAD example_db.user ON duplicate_table_02
COLUMNS(uid,username,email)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
    (
    "kafka_broker_list" = "192.168.12.188:9092",
    "kafka_topic" = "doris_user_test1",
    "kafka_partitions" = "0",
    "kafka_offsets" = "0"
    );
    
 
 

--user 表
CREATE TABLE IF NOT EXISTS example_db.duplicate_table_03
(
     
    tid VARCHAR(32) COMMENT "tid id",
    typename VARCHAR(32) COMMENT "类型名称"
    
)
DUPLICATE KEY(tid,typename)
DISTRIBUTED BY HASH(tid) BUCKETS 5
PROPERTIES("replication_num" = "1");


CREATE ROUTINE LOAD example_db.type ON duplicate_table_03
COLUMNS(tid,typename)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
    (
    "kafka_broker_list" = "192.168.12.188:9092",
    "kafka_topic" = "doris_type_test1",
    "kafka_partitions" = "0",
    "kafka_offsets" = "0"
);

--type2 表
CREATE TABLE IF NOT EXISTS example_db.duplicate_table_04
(
     
    t_id VARCHAR(32) COMMENT "tid id",
    typename VARCHAR(32) COMMENT "类型名称"
    
)
DUPLICATE KEY(t_id,typename)
DISTRIBUTED BY HASH(t_id) BUCKETS 5
PROPERTIES("replication_num" = "1");


CREATE ROUTINE LOAD example_db.type2 ON duplicate_table_04
COLUMNS(t_id,typename)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
    (
    "kafka_broker_list" = "192.168.12.188:9092",
    "kafka_topic" = "doris_type2_test1",
    "kafka_partitions" = "0",
    "kafka_offsets" = "0"
);

--type3 表
CREATE TABLE IF NOT EXISTS example_db.duplicate_table_05
(
     
    tt_id VARCHAR(32) COMMENT "tid id",
    typename VARCHAR(32) COMMENT "类型名称"
    
)
DUPLICATE KEY(tt_id,typename)
DISTRIBUTED BY HASH(tt_id) BUCKETS 5
PROPERTIES("replication_num" = "1");


CREATE ROUTINE LOAD example_db.type3 ON duplicate_table_05
COLUMNS(tt_id,typename)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
    (
    "kafka_broker_list" = "192.168.12.188:9092",
    "kafka_topic" = "doris_type3_test1",
    "kafka_partitions" = "0",
    "kafka_offsets" = "0"
);
    
--表结构变更
        --案例:针对testing_table增加列

ALTER TABLE testing_table ADD COLUMN uv BIGINT SUM DEFAULT '0' after pv;    

   
SHOW ALTER TABLE COLUMN; ---查看进度:

Doris初步使用2

    ---rollup 物化索引
ALTER TABLE testing_table ADD ROLLUP rollup_id_name(siteid, username);

SHOW ALTER TABLE ROLLUP; ---查看rollup

Doris初步使用2


Rollup 建立完成之后可以使用 DESC table_name ALL 查看表的 Rollup 信息。

DESC testing_table ALL;

Doris初步使用2

 

可以使用以下命令取消当前正在执行的作业:
CANCEL ALTER TABLE ROLLUP FROM testing_table;

------两个表join 测试 :
---join 分为两种:Broadcast/Shuffle Join

SELECT duplicate_table_03.*,duplicate_table_04.* FROM duplicate_table_03 JOIN duplicate_table_04 WHERE duplicate_table_03.tid = duplicate_table_04.t_id;

---使用[broadcast]
SELECT duplicate_table_03.*,duplicate_table_04.* FROM duplicate_table_03 JOIN [broadcast] duplicate_table_04 WHERE duplicate_table_03.tid = duplicate_table_04.t_id;

---使用 [shuffle] Join:
SELECT duplicate_table_03.*,duplicate_table_04.* FROM duplicate_table_03 JOIN [broadcast] duplicate_table_04 WHERE duplicate_table_03.tid = duplicate_table_04.t_id;

---使用left join  基本语法都支持
SELECT duplicate_table_03.*,duplicate_table_04.* FROM duplicate_table_03 left JOIN [shuffle] duplicate_table_04 on duplicate_table_03.tid = duplicate_table_04.t_id;

Doris初步使用2

---多表join

SELECT duplicate_table_03.*,duplicate_table_04.*,duplicate_table_05.* 
FROM duplicate_table_03 
JOIN duplicate_table_04 
ON duplicate_table_03.tid=duplicate_table_04.t_id  
JOIN duplicate_table_05 
ON duplicate_table_04.t_id=duplicate_table_05.tt_id