Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
Flink SQL ʵս£ºHBase µÄ½áºÏÓ¦ÓÃ
 
×÷ÕߣºÓà°½
  2566  次浏览      27
 2021-1-26
 
±à¼­ÍƼö:
±¾ÎÄÖ÷Òª½éÉÜ HBase ºÍ Flink SQL µÄ½áºÏʹÓõÄÁ½ÖÖ³¡¾°£¬Ï£Íû¶ÔÄúµÄѧϰÓÐËù°ïÖú¡£
±¾ÎÄÀ´×ÔÓÚApacheFlink£¬ÓÉAlice±à¼­¡¢ÍƼö¡£

HBase ×÷Ϊ Google ·¢±í Big Table ÂÛÎĵĿªÔ´ÊµÏÖ°æ±¾£¬ÊÇÒ»ÖÖ·Ö²¼Ê½ÁÐʽ´æ´¢µÄÊý¾Ý¿â£¬¹¹½¨ÔÚ HDFS Ö®É쵀 NoSQL Êý¾Ý¿â£¬·Ç³£Êʺϴó¹æÄ£ÊµÊ±²éѯ£¬Òò´Ë HBase ÔÚʵʱ¼ÆËãÁìÓòʹÓ÷dz£¹ã·º¡£¿ÉÒÔʵʱд HBase£¬Ò²¿ÉÒÔÀûÓà buckload Ò»°Ñ°ÑÀëÏß Job Éú³É HFile Load µ½HBase ±íÖС£¶øµ±Ï Flink SQL µÄ»ðÈȳ̶Ȳ»Óöà˵£¬Flink SQL ҲΪ HBase ÌṩÁË connector£¬Òò´Ë HBase Óë Flink SQL µÄ½áºÏ·Ç³£ÓбØÒªÊµ¼ùʵ¼ù¡£

µ±È»£¬±¾ÎļÙÉèÓû§ÓÐÒ»¶¨µÄ HBase ֪ʶ»ù´¡£¬²»»áÏêϸȥ½éÉÜ HBase µÄ¼Ü¹¹ºÍÔ­Àí£¬±¾ÎÄ×ÅÖØ½éÉÜ HBase ºÍ Flink ÔÚʵ¼Ê³¡¾°ÖеĽáºÏʹÓá£Ö÷Òª·ÖΪÁ½ÖÖ³¡¾°£¬µÚÒ»ÖÖ³¡¾°£ºHBase ×÷Ϊά±íÓë Flink Kafka table ×ö temporal table join µÄ³¡¾°£»µÚ¶þÖÖ³¡¾°£ºFlink SQL ×ö¼ÆËãÖ®ºóµÄ½á¹ûдµ½ HBase ±í£¬¹©ÆäËûÓû§²éѯµÄ³¡¾°¡£Òò´Ë£¬±¾ÎĽéÉܵÄÄÚÈÝÈçÏÂËùʾ£º

¡¤ HBase »·¾³×¼±¸

¡¤ Êý¾Ý×¼±¸

¡¤ HBase ×÷Ϊά¶È±í½øÐÐ temporal table joinµÄ³¡¾°

¡¤ Flink SQL ×ö¼ÆËãд HBase µÄ³¡¾°

¡¤ ×ܽá

Ò»¡¢HBase »·¾³×¼±¸

ÓÉÓÚûÓвâÊ﵀ HBase »·¾³ÒÔ¼°ÎªÁ˱ÜÃâÎÛȾÏßÉÏ Hbase »·¾³¡£Òò´Ë£¬×Ô¼º buildÒ»¸ö Hbase docker image(´ó¼Ò¿ÉÒÔ docker pull guxinglei/myhbase À­µ½±¾µØ)£¬ÊÇ»ùÓÚ¹Ù·½¸É¾»µÄ ubuntu imgae Ö®Éϰ²×°ÁË Hbase 2.2.0 °æ±¾ÒÔ¼° JDK1.8 °æ±¾¡£

Æô¶¯ÈÝÆ÷£¬±©Â¶ Hbase web UI ¶Ë¿ÚÒÔ¼°ÄÚÖà zk ¶Ë¿Ú£¬·½±ãÎÒÃÇ´Ó web Ò³Ãæ¿´ÐÅÏ¢ÒÔ¼°´´½¨ Flink Hbase table ÐèÒª zk µÄÁ´½ÓÐÅÏ¢¡£

docker run -it --network=host -p 2181:2181 -p 60011:60011 docker.io/guxinglei/myhbase:latest bash

½øÈëÈÝÆ÷£¬Æô¶¯ HBase ¼¯Èº£¬ÒÔ¼°Æô¶¯ rest server£¬ºóÐø·½±ãÎÒÃÇÓà REST API À´¶ÁÈ¡ Flink SQL д½ø HBase µÄÊý¾Ý¡£

# Æô¶¯hbase ¼¯Èºbin/start-hbase.sh# ºǫ́Æô¶¯
restServerbin/hbase-daemon.sh start rest -p 8000

¶þ¡¢Êý¾Ý×¼±¸

ÓÉÓÚ HBase »·¾³ÊÇ×Ô¼ºÁÙʱ¸ãµÄµ¥»ú·þÎñ£¬ÀïÃæÃ»ÓÐÊý¾Ý£¬ÐèÒªÍùÀïÃæÐ´µãÊý¾Ý¹©ºóÐøÊ¾ÀýÓá£ÔÚ Flink SQL ʵսϵÁеڶþƪÖнéÉÜÁËÈçºÎ×¢²á Flink Mysql table£¬ÎÒÃÇ¿ÉÒÔ½«¹ã¸æÎ»±í³éÈ¡µ½ HBase ±íÖУ¬ÓÃÀ´×öά¶È±í£¬½øÐÐ temporal table join¡£Òò´Ë£¬ÎÒÃÇÐèÒªÔÚ HBase Öд´½¨Ò»ÕÅ±í£¬Í¬Ê±»¹ÐèÒª´´½¨ Flink HBase table, ÕâÁ½Õűíͨ¹ý Flink SQL µÄ HBase connector ¹ØÁªÆðÀ´¡£

ÔÚÈÝÆ÷ÖÐÆô¶¯ HBase shell£¬´´½¨Ò»ÕÅÃûΪ dim_hbase µÄ HBase ±í£¬½¨±íÓï¾äÈçÏÂËùʾ£º

# ÔÚhbase shell´´½¨ hbase±í
hbase(main):002:0> create 'dim_hbase','cf'
Created table dim_hbase
Took 1.3120 seconds
=> Hbase::Table - dim_hbase

ÔÚ Flink Öд´½¨ Flink HBase table£¬½¨±íÓï¾äÈçÏÂËùʾ£º

# ×¢²á Flink Hbase table
DROP TABLE IF EXISTS flink_rtdw.demo.hbase_dim_table;
CREATE TABLE flink_rtdw.demo.hbase_dim_table (
rowkey STRING,
cf ROW < adspace_name STRING >,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'dim_hbase',
'sink.buffer-flush.max-rows' = '1000',
'zookeeper.quorum' = 'localhost:2181'
);

Flink MySQL table ºÍ Flink HBase table ÒѾ­´´½¨ºÃÁË£¬¾Í¿ÉÒÔд³éÈ¡Êý¾Ýµ½HBase µÄ SQL job ÁË£¬SQL Óï¾äÒÔ¼° job ״̬ÈçÏÂËùʾ£º

# ³éÈ¡MysqlÊý¾Ýµ½Hbase±íÖÐ

insert into
hbase_dim_table
select
CAST (ID as VARCHAR),
ROW(name)
from
mysql_dim_table;

03 HBase ×÷Ϊά±íÓë Kafka ×ö temporal join µÄ³¡¾°

ÔÚ Flink SQL join ÖУ¬Î¬¶È±íµÄ join Ò»¶¨ÈƲ»¿ªµÄ£¬±ÈÈç¶©µ¥½ð¶î join »ãÂÊ±í£¬µã»÷Á÷ join ¹ã¸æÎ»µÄÃ÷ϸ±íµÈµÈ£¬Ê¹Óó¡¾°·Ç³£¹ã·º¡£ÄÇô×÷Ϊ·Ö²¼Ê½Êý¾Ý¿âµÄ HBase ±È MySQL ×÷Ϊά¶È±íÓÃ×÷ά¶È±í join ¸üÓÐÓÅÊÆ¡£ÔÚ Flink SQL ʵսϵÁеڶþƪÖУ¬ÎÒÃÇ×¢²áÁË¹ã¸æµÄµã»÷Á÷£¬½« Kafka topic ×¢²á Flink Kafka Table£¬Í¬Ê±Ò²½éÉÜÁË temporal table join ÔÚ Flink SQL ÖеÄʹÓã»ÄÇô±¾½ÚÖн«»á½éÉÜ HBase ×÷Ϊά¶È±íÀ´Ê¹Óã¬ÉÏÃæÐ¡½ÚÖÐÒѾ­½«Êý¾Ý³éÈ¡µ½ Hbase ÖÐÁË£¬ÎÒÃÇÖ±½Óд temporal table join ¼ÆËãÂß¼­¼´¿É¡£

×÷Ϊ¹ã¸æµã»÷Á÷µÄ Flink Kafa table Óë ×÷Ϊ¹ã¸æÎ»µÄ Flink HBase table ͨ¹ý¹ã¸æÎ» Id ½øÐÐ temporal table join£¬Êä³ö¹ã¸æÎ» ID ºÍ¹ã¸æÎ»ÖÐÎÄÃû×Ö£¬SQL join Âß¼­ÈçÏÂËùʾ£º

select adsdw_dwd_max_click_mobileapp.publisher_ adspace _ adspaceId as publisher_adspace_adspaceId,
hbase_dim_table.cf.adspace_name as publisher_adspace_name
from adsdw_dwd_max_click_mobileapp
left join hbase_dim_table FOR SYSTEM_TIME AS OF adsdw_dwd_max_click_mobileapp.procTime
on cast(adsdw_dwd_max_click_mobileapp.publisher_ adspace _ adspaceId as string) = hbase_dim_table.rowkey;

temporal table join job Ìá½» Flink ¼¯ÈºÉϵÄ״̬ÒÔ¼° join ½á¹ûÈçÏÂËùʾ£º

ËÄ¡¢¼ÆËã½á¹û sink µ½ HBase ×÷Ϊ½á¹ûµÄ³¡¾°

ÉÏÃæÐ¡½ÚÖУ¬HBase ×÷Ϊά¶È±íÓÃ×÷ temporal table join ÊǷdz£³£¼ûµÄ³¡¾°£¬Êµ¼ÊÉÏ HBase ×÷Ϊ´æ´¢¼ÆËã½á¹ûÒ²ÊǷdz£³£¼ûµÄ³¡¾°£¬±Ï¾¹ Hbase ×÷Ϊ·Ö²¼Ê½Êý¾Ý¿â£¬µ×²ã´æ´¢ÊÇÓµÓжั±¾»úÖÆµÄ HDFS£¬Î¬»¤¼òµ¥£¬À©ÈÝ·½±ã£¬ ʵʱ²éѯ¿ì£¬¶øÇÒÌṩ¸÷ÖÖ¿Í»§¶Ë·½±ãÏÂÓÎʹÓô洢ÔÚ HBase ÖеÄÊý¾Ý¡£ÄÇô±¾Ð¡½Ú¾Í½éÉÜ Flink SQL ½«¼ÆËã½á¹ûдµ½ HBase£¬²¢ÇÒͨ¹ý REST API ²éѯ¼ÆËã½á¹ûµÄ³¡¾°¡£

½øÈëÈÝÆ÷ÖУ¬ÔÚ HBase ÖÐн¨Ò»ÕÅ HBase ±í£¬Ò»¸ö column family ¾ÍÂú×ãÐèÇ󣬽¨±íÓï¾äÈçÏÂËùʾ£º

# ×¢²áhbase sink table
create 'dwa_hbase_click_report','cf'

½¨Á¢ºÃ HBase ±íÖ®ºó£¬ÎÒÃÇÐèÒªÔÚ Flink SQL ´´½¨Ò»ÕÅ Flink HBase table£¬Õâ¸öʱºòÎÒÃÇÐèÒªÃ÷È· cf Õâ¸ö column famaly ÏÂÃæ column ×ֶΣ¬ÔÚ Flink SQLʵսµÚ¶þƪÖУ¬ÒѾ­×¢²áºÃÁË×÷Ϊµã»÷Á÷µÄ Flink Kafka table£¬Òò´Ë±¾½ÚÖУ¬½«»á¼ÆËãµã»÷Á÷µÄ uv ºÍµã»÷Êý£¬Òò´ËÁ½¸ö column ·Ö±ðΪ uv ºÍ click_count£¬½¨±íÓï¾äÈçÏÂËùʾ£º

# ×¢²á Flink Hbase table
DROP TABLE IF EXISTS flink_rtdw.demo.dwa_hbase_click_report;
CREATE TABLE flink_rtdw.demo.dwa_hbase_click_report (
rowkey STRING,
cf ROW < uv BIGINT, click_count BIGINT >,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'dwa_hbase_click_report',
'sink.buffer-flush.max-rows' = '1000',
'zookeeper.quorum' = 'hostname:2181'
);

Ç°Ãæµã»÷Á÷µÄ Flink Kafka table ºÍ´æ´¢¼ÆËã½á¹ûµÄ HBase table ºÍ Flink HBase table ÒѾ­×¼±¸ÁË£¬ÎÒÃǽ«×öÒ»¸ö1·ÖÖӵķ­×ª´°¿Ú¼ÆËã uv ºÍµã»÷Êý£¬²¢ÇÒ½«¼ÆËã½á¹ûдµ½ HBase ÖС£¶Ô HBase Á˽âµÄÈËÓ¦¸ÃÖªµÀ£¬rowkey µÄÉè¼Æ¶Ô hbase regoin µÄ·Ö²¼ÓÐ×ŷdz£ÖØÒªµÄÓ°Ï죬»ùÓÚ´ËÎÒÃÇµÄ rowkey ÊÇʹÓà Flink SQL ÄÚÖÃµÄ reverse º¯Êý½øÐÐ¹ã¸æÎ» Id ½øÐз´×ªºÍ´°¿ÚÆôʼʱ¼ä×ö concat£¬Òò´Ë£¬SQL Âß¼­Óï¾äÈçÏÂËùʾ£º

INSERT INTO dwa_hbase_click_report
SELECT
CONCAT(REVERSE(CAST(publisher_adspace_adspaceId AS STRING)) ,
'_',
CAST((UNIX_TIMESTAMP(DATE_FORMAT(TUMBLE_START(ets, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss')) * 1000) AS STRING)
) as rowkey,
ROW(COUNT(DISTINCT audience_mvid) , COUNT(audience_behavior_click_creative_impressionId)) as cf
FROM
adsdw_dwd_max_click_mobileapp
WHERE publisher_adspace_adspaceId IS NOT NULL AND audience _ mvid IS NOT NULL AND audience_behavior_click_creative_impressionId IS NOT NULL
GROUP BY
TUMBLE(ets, INTERVAL '1' MINUTE),
publisher_adspace_adspaceId;

SQL job Ìá½»Ö®ºóµÄ״̬ÒÔ¼°½á¹û check ÈçÏÂËùʾ£º

ÉÏÊö SQL job ÒѾ­³É¹¦µÄ½«½áËã½á¹ûдµ½ HBase ÖÐÁË¡£¶ÔÓÚÏßÉ쵀 HBase ·þÎñÀ´½²£¬ºÜ¶àͬʲ»Ò»¶¨ÓÐ HBase ¿Í»§¶ËµÄȨÏÞ£¬´Ó¶øÒ²²»ÄÜͨ¹ý HBase shell ¶ÁÈ¡Êý¾Ý£»ÁíÍâ×÷ΪÏßÉϱ¨±í·þÎñÏÔÈ»²»¿ÉÄÜͨ¹ý HBase shell À´Í¨¹ý²éѯÊý¾Ý¡£Òò´Ë£¬ÔÚʵʱ±¨±í³¡¾°ÖУ¬Êý¾Ý¿ª·¢¹¤³Ìʦ½«Êý¾ÝдÈë HBase, ǰ¶Ë¹¤³Ìʦͨ¹ý REST API À´¶ÁÈ¡Êý¾Ý¡£Ç°ÃæÎÒÃÇÒѾ­Æô¶¯ÁË HBase rest server ½ø³Ì£¬ÎÒÃÇ¿ÉÒÔͨ rest ·þÎñÌṩ¶ÁÈ¡ HBase ÀïÃæµÄÊý¾Ý¡£

ÎÒÃÇÏÈ get Ò»Ìõ¸Õ¸Õдµ½ HBase ÖеÄÊý¾Ý¿´¿´£¬ÈçÏÂËùʾ£º

ÏÂÃæÎÒÃÇ¿ªÊ¼Í¨¹ý REST API À´²éѯ HBase ÖеÄÊý¾Ý£¬µÚÒ»²½£¬Ö´ÐÐÈçÏÂÓï¾äÄõ½ scannerId£»Ê×ÏÈÐèÒª½«Òª²éѯµÄ rowkey ½øÐÐ base64 ±àÂë²ÅÄÜʹÓ㬺óÃæÐèÒª½«½á¹û½øÐÐ base64 ½âÂë

curl -vi -X PUT \
-H "Accept: text/xml" \
-H "Content-Type: text/xml" \
-d '<Scanner startRow="MDEyMjYxMl8xNjA2Mjk1MjgwMDAw" endRow="MDEyMjYxMl8xNjA2Mjk1MjgwMDAw"></Scanner>' \
"http://hostname:8000/dwa_hbase_click_report/scanner"

µÚ¶þ²½£¬Ö´ÐÐÈçÏÂÓï¾ä¸ù¾ÝÉÏÌõÓï¾ä·µ»ØµÄ scannerID ²éѯÊý¾Ý£¬¿ÉÒÔ¿´µ½·µ»ØµÄ½á¹û£º

curl -vi -X GET \
-H "Accept: application/json" \
"http://hostname:8000/dwa_hbase_click_report/ scanner /16063768141736ac0a8b5"

µÚÈý²½£¬²éѯÍê±ÏÖ®ºó£¬Ö´ÐÐÈçÏÂÓï¾äɾ³ý¸ÃscannerId:

curl -vi -X DELETE \
-H "Accept: text/xml" \
"http://hostname:8000/dwa_hbase_click_
report/scanner/16063768141736ac0a8b5"

Îå¡¢×ܽá

ÔÚ±¾ÆªÎÄÕÂÖУ¬ÎÒÃǽéÉÜÁË HBase ºÍ Flink SQL µÄ½áºÏʹÓñȽϹ㷺Á½Öֵij¡¾°£º×÷Ϊά¶È±íÓÃÒÔ¼°´æ´¢¼ÆËã½á¹û£»Í¬Ê±Ê¹Óà REST API ¶Ô HBase ÖеÄÊý¾Ý½øÐвéѯ£¬¶ÔÓÚ²éѯÓû§À´Ëµ£¬±ÜÃâÖ±½Ó±©Â¶ HBase µÄ zk£¬Í¬Ê±½« rest server ºÍ HBase ¼¯Èº½âñî¡£

   
2566 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeekÔÚÈí¼þ²âÊÔÓ¦ÓÃʵ¼ù 4-12[ÔÚÏß]
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢Êµ¼ù 4-19[ÔÚÏß]
UAF¼Ü¹¹ÌåϵÓëʵ¼ù 4-11[±±¾©]
AIÖÇÄÜ»¯Èí¼þ²âÊÔ·½·¨Óëʵ¼ù 5-23[ÉϺ£]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 4-26[±±¾©]
ÒµÎñ¼Ü¹¹Éè¼ÆÓ뽨ģ 4-18[±±¾©]
 
×îÐÂÎÄÕÂ
´óÊý¾Ýƽ̨ϵÄÊý¾ÝÖÎÀí
ÈçºÎÉè¼ÆÊµÊ±Êý¾Ýƽ̨£¨¼¼Êõƪ£©
´óÊý¾Ý×ʲú¹ÜÀí×ÜÌå¿ò¼Ü¸ÅÊö
Kafka¼Ü¹¹ºÍÔ­Àí
ELK¶àÖּܹ¹¼°ÓÅÁÓ
×îпγÌ
´óÊý¾Ýƽ̨´î½¨Óë¸ßÐÔÄܼÆËã
´óÊý¾Ýƽ̨¼Ü¹¹ÓëÓ¦ÓÃʵս
´óÊý¾ÝϵͳÔËά
´óÊý¾Ý·ÖÎöÓë¹ÜÀí
Python¼°Êý¾Ý·ÖÎö
³É¹¦°¸Àý
ijͨÐÅÉ豸ÆóÒµ PythonÊý¾Ý·ÖÎöÓëÍÚ¾ò
Ä³ÒøÐÐ È˹¤ÖÇÄÜ+Python+´óÊý¾Ý
±±¾© Python¼°Êý¾Ý·ÖÎö
ÉñÁúÆû³µ ´óÊý¾Ý¼¼Êõƽ̨-Hadoop
ÖйúµçÐÅ ´óÊý¾Ýʱ´úÓëÏÖ´úÆóÒµµÄÊý¾Ý»¯ÔËӪʵ¼ù