Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][hbase] sql模式下新增特性 #1886

Open
3 tasks done
libailin opened this issue Mar 1, 2024 · 0 comments · May be fixed by #1891
Open
3 tasks done

[Feature][hbase] sql模式下新增特性 #1886

libailin opened this issue Mar 1, 2024 · 0 comments · May be fixed by #1891
Labels
feature-request this is a feature requests on the product

Comments

@libailin
Copy link
Contributor

libailin commented Mar 1, 2024

Search before asking

  • I had searched in the issues and found no similar feature requirement.

Description

sql模式下新增特性:
1、写入时支持配置rowkeyExpress
2、写入时支持配置hbase的时间戳,支持选项:当前时间(默认)、指定时间列名(例如:列簇:字段名)、指定具体时间
3、读取时支持配置客户端每次 rpc 从服务器端读取的列数,默认不限制。
4、读取时支持配置 rowKey起始点、rowKey结束点、rowkey是否是BytesBinary
5、解决sql模式下缺少Hbase2DynamicTableFactory
6、hbase 支持 multiVersionFixedColumn模式(竖表读取)
当是竖表读取时,声明 source table只能有四个字段,并且为固定的字段类型,参考下面声明:

CREATE TABLE source_hbase
(
    `rowkey` VARCHAR,
    `family_qualifier` VARCHAR,
    `timestamp` bigint,
    `value` VARCHAR
) 

7、支持配置hadoop用户名,解决读写hbase权限问题
8、支持配置字段值为空时写入模式,SKIP:跳过,此字段不写入,EMPTY:空字节数组代替
9、优化当下游没有声明使用 rowkey 字段时报错问题

Caused by: java.lang.UnsupportedOperationException: No implementation provided for SupportsProjectionPushDown. Please implement SupportsProjectionPushDown#applyProjection(int[][], DataType)

Use case


---------------------------------------------------
-- hbase 命令:
-- 输入 `hbase shell` 进入hbase shell
-- list 查看全部表
-- describe 'tableName' 查看表结构
-- scan 'tableName' 查看表所有记录(全面扫描)
-- count 'tableName' 统计行数
-- alter '表名',{NAME=>'cf列族名',VERSIONS=>3} 修改表结构,让Hbase表支持存储3个VERSIONS的版本列数据
-- scan '表名',{VERSIONS=>5}
---------------------------------------------------

CREATE TABLE source_stream
(
    rowkey VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP(3)
) WITH (
    'connector' = 'stream-x'
    ,'number-of-rows' = '1'
);

CREATE TABLE source_hbase
(
    rowkey VARCHAR,
    cf ROW(item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3)),
    PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
      'connector' = 'hbase2-x'
      ,'zookeeper.quorum' = 'xxx:2181'
      ,'zookeeper.znode.parent' = '/hbase'
      -- 空值字符串代替,默认值:"null"
      ,'null-string-literal' = 'null'
      -- 表名,支持带命名空间的表名,格式 namespace:table , 命名空间与表名之间是冒号连接的
      ,'table-name' = 'test_hbase'
      -- rowKey起始点, 默认值:无
      ,'start-row-key' = 'a'
      -- rowKey结束点, 默认值:无
      ,'end-row-key' = 'c'
      -- rowkey是否是BytesBinary, 默认值:false
      ,'is-binary-row-key' = 'false'
      -- 客户端rpc每次fetch最大行数, 默认值:1000
      ,'scan-cache-size' = '1000'
      -- 客户端每次rpc从服务器端读取的列数, 默认值:不限制-1
--       ,'scan-batch-size' = '2'

     -- 传入hadoop账号读取\写入无权限问题
     ,'properties.hadoop.user.name' = 'hdp-test'
      -- 读取HBase的模式,支持normal模式和multiVersionFixedColumn模式。默认:normal
      ,'mode' = 'multiVersionFixedColumn'
       -- 指定在多版本模式下的HBase Reader读取的版本数,取值只能为-1或大于1的数字,-1表示读取所有版本。
      ,'max-version' = '5'

      -- hbase kerberos 配置
      , 'properties.hbase.security.authorization' = 'Kerberos'
      , 'properties.hbase.security.authentication' = 'Kerberos'
      , 'properties.hbase.security.auth.enable' = 'true'
      -- kerberos 使用sftp远程文件
      ,'properties.remoteDir' = '/data/kerberos'
      ,'properties.java.security.krb5.conf' = 'krb5.conf'
      ,'properties.principalFile' = 'hbase.keytab'
      ,'properties.sftpConf' = '{"username":"xxx", "password":"xx", "host":"xxx", "port":"22"}'
      ,'properties.principal' = 'xx/xxx@EXAMPLE.COM'
      -- 以下参数 解决 Can't get Master Kerberos principal for use as renewer
      ,'properties.yarn.resourcemanager.principal' = 'xx/xxx@EXAMPLE.COM'
      -- 以下参数 解决 org.apache.hadoop.hdfs.DFSClient - Failed to connect to /xxxx for block, add to deadNodes and continue. java.io.IOException: 远程主机强迫关闭了一个现有的连接。
      -- 以下参数 解决 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Failed to read expected SASL data transfer protection handshake from client at /xxxxx. Perhaps the client is running an older version of Hadoop which does not support SASL data transfer protection
      ,'properties.dfs.data.transfer.protection' = 'authentication'
);

CREATE TABLE sink_hbase
(
    rowkey VARCHAR,
    cf ROW(item_id VARCHAR, category_id string, behavior VARCHAR, ts TIMESTAMP(3))
    ,PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
    'connector' = 'hbase2-x'
    -- zk地址
    ,'zookeeper.quorum' = 'xxxx:2181'
    -- hbase在zk的路径
    ,'zookeeper.znode.parent' = '/hbase'
    -- 描述:每个写请求缓冲行的最大内存大小。这样可以提高HBase写数据的性能,但可能会增加时延。可以设置为'0'来禁用它。默认值:2mb
--     ,'sink.buffer-flush.max-size' = '1000'
    -- 描述:每个写入请求要缓冲的最大行数。这样可以提高HBase写数据的性能,但可能会增加时延。可以设置为'0'来禁用它。默认值:1000
    ,'sink.buffer-flush.max-rows' = '1000'
    -- 描述:批量写时间间隔,单位:毫秒, 默认值:10000
    ,'sink.buffer-flush.interval' = '2000'
    -- 表名
    ,'table-name' = 'test_hbase'

    -- 用于构造rowkey的描述信息,采用字符串格式,形式如下 字符串格式为:$(cf:col),
    -- 可以多个字段组合:$(cf:col1)_$(cf:col2), 可以使用md5函数:md5($(cf:col))
    ,'rowkey-express' = 'md5($(cf:item_id)_$(cf:category_id))'
    --  描述:指定写入hbase的时间戳。支持:当前时间、指定时间列,指定时间,三者选一。
    -- 指定时间列簇:列名
    ,'version-column-name' = 'cf:behavior'
    -- 指定具体时间
--     ,'version-column-value' = '2024-02-23 10:10:10'
     -- 字段值为空时写入模式,可选:SKIP:跳过,此字段不写入,EMPTY:空字节数组代替,默认值:SKIP
    ,'null-mode' = 'SKIP'
);

CREATE TABLE sink_stream
(
    rowkey VARCHAR,
--     cf ROW(item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3))

    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP(3)

) WITH (
      'connector' = 'stream-x'
);


-- 测试向hbase写入数据
-- insert into sink_hbase
-- SELECT rowkey, ROW(item_id, category_id, behavior, ts ) as cf
-- from source_stream;

-- 指定rowkey具体值
-- insert into sink_hbase
-- SELECT rowkey, ROW(item_id, category_id, behavior, ts ) as cf
-- from (select
--           CAST('a' as string) as rowkey,
--           CAST('a-item_id' as string) as item_id,
--           CAST('a-category_id' as string) as category_id,
--           CAST('a-behavior' as string) as behavior,
--           CAST('2024-02-21 17:10:10' as timestamp(3)) as ts);


-- insert into sink_hbase
-- SELECT rowkey, ROW(item_id, category_id, behavior, ts ) as cf
-- from (select
--           CAST('a' as string) as rowkey,
--           CAST('a-item_id-7' as string) as item_id,
-- --           CAST('1708654215000' as string) as category_id,
--           CAST('a-category_id-7' as string) as category_id,
--           CAST('2024-02-23 10:10:15' as string) as behavior,
--           CAST('2024-02-23 10:10:13' as timestamp(3)) as ts);

-- 测试从hbase读取数据
-- insert into sink_stream select * from source_hbase;
insert into sink_stream select rowkey, cf.item_id, cf.category_id, cf.behavior, cf.ts from source_hbase;


Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@libailin libailin added the feature-request this is a feature requests on the product label Mar 1, 2024
libailin added a commit to libailin/chunjun that referenced this issue Mar 18, 2024
@libailin libailin linked a pull request Mar 18, 2024 that will close this issue
9 tasks
libailin added a commit to libailin/chunjun that referenced this issue Mar 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature-request this is a feature requests on the product
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant