±à¼ÍƼö: |
±¾ÎÄÖ÷Òª½éÉÜ
HBase ºÍ Flink SQL µÄ½áºÏʹÓõÄÁ½ÖÖ³¡¾°£¬Ï£Íû¶ÔÄúµÄѧϰÓÐËù°ïÖú¡£
±¾ÎÄÀ´×ÔÓÚApacheFlink£¬ÓÉAlice±à¼¡¢ÍƼö¡£
|
|
HBase ×÷Ϊ Google ·¢±í Big Table ÂÛÎĵĿªÔ´ÊµÏÖ°æ±¾£¬ÊÇÒ»ÖÖ·Ö²¼Ê½ÁÐʽ´æ´¢µÄÊý¾Ý¿â£¬¹¹½¨ÔÚ
HDFS Ö®É쵀 NoSQL Êý¾Ý¿â£¬·Ç³£Êʺϴó¹æÄ£ÊµÊ±²éѯ£¬Òò´Ë HBase ÔÚʵʱ¼ÆËãÁìÓòʹÓ÷dz£¹ã·º¡£¿ÉÒÔʵʱд
HBase£¬Ò²¿ÉÒÔÀûÓà buckload Ò»°Ñ°ÑÀëÏß Job Éú³É HFile Load µ½HBase
±íÖС£¶øµ±Ï Flink SQL µÄ»ðÈȳ̶Ȳ»Óöà˵£¬Flink SQL ҲΪ HBase ÌṩÁË
connector£¬Òò´Ë HBase Óë Flink SQL µÄ½áºÏ·Ç³£ÓбØÒªÊµ¼ùʵ¼ù¡£
µ±È»£¬±¾ÎļÙÉèÓû§ÓÐÒ»¶¨µÄ HBase ֪ʶ»ù´¡£¬²»»áÏêϸȥ½éÉÜ HBase µÄ¼Ü¹¹ºÍÔÀí£¬±¾ÎÄ×ÅÖØ½éÉÜ
HBase ºÍ Flink ÔÚʵ¼Ê³¡¾°ÖеĽáºÏʹÓá£Ö÷Òª·ÖΪÁ½ÖÖ³¡¾°£¬µÚÒ»ÖÖ³¡¾°£ºHBase ×÷Ϊά±íÓë
Flink Kafka table ×ö temporal table join µÄ³¡¾°£»µÚ¶þÖÖ³¡¾°£ºFlink
SQL ×ö¼ÆËãÖ®ºóµÄ½á¹ûдµ½ HBase ±í£¬¹©ÆäËûÓû§²éѯµÄ³¡¾°¡£Òò´Ë£¬±¾ÎĽéÉܵÄÄÚÈÝÈçÏÂËùʾ£º
¡¤ HBase »·¾³×¼±¸
¡¤ Êý¾Ý×¼±¸
¡¤ HBase ×÷Ϊά¶È±í½øÐÐ temporal table joinµÄ³¡¾°
¡¤ Flink SQL ×ö¼ÆËãд HBase µÄ³¡¾°
¡¤ ×ܽá
Ò»¡¢HBase »·¾³×¼±¸
ÓÉÓÚûÓвâÊ﵀ HBase »·¾³ÒÔ¼°ÎªÁ˱ÜÃâÎÛȾÏßÉÏ Hbase »·¾³¡£Òò´Ë£¬×Ô¼º buildÒ»¸ö
Hbase docker image(´ó¼Ò¿ÉÒÔ docker pull guxinglei/myhbase
Àµ½±¾µØ)£¬ÊÇ»ùÓÚ¹Ù·½¸É¾»µÄ ubuntu imgae Ö®Éϰ²×°ÁË Hbase 2.2.0 °æ±¾ÒÔ¼°
JDK1.8 °æ±¾¡£
Æô¶¯ÈÝÆ÷£¬±©Â¶ Hbase web UI ¶Ë¿ÚÒÔ¼°ÄÚÖà zk ¶Ë¿Ú£¬·½±ãÎÒÃÇ´Ó web Ò³Ãæ¿´ÐÅÏ¢ÒÔ¼°´´½¨
Flink Hbase table ÐèÒª zk µÄÁ´½ÓÐÅÏ¢¡£
docker run -it
--network=host -p 2181:2181 -p 60011:60011 docker.io/guxinglei/myhbase:latest
bash |
½øÈëÈÝÆ÷£¬Æô¶¯ HBase ¼¯Èº£¬ÒÔ¼°Æô¶¯ rest server£¬ºóÐø·½±ãÎÒÃÇÓÃ
REST API À´¶ÁÈ¡ Flink SQL д½ø HBase µÄÊý¾Ý¡£
# Æô¶¯hbase ¼¯Èºbin/start-hbase.sh#
ºǫ́Æô¶¯ restServerbin/hbase-daemon.sh start rest -p
8000 |
¶þ¡¢Êý¾Ý×¼±¸
ÓÉÓÚ HBase »·¾³ÊÇ×Ô¼ºÁÙʱ¸ãµÄµ¥»ú·þÎñ£¬ÀïÃæÃ»ÓÐÊý¾Ý£¬ÐèÒªÍùÀïÃæÐ´µãÊý¾Ý¹©ºóÐøÊ¾ÀýÓá£ÔÚ Flink
SQL ʵսϵÁеڶþƪÖнéÉÜÁËÈçºÎ×¢²á Flink Mysql table£¬ÎÒÃÇ¿ÉÒÔ½«¹ã¸æÎ»±í³éÈ¡µ½
HBase ±íÖУ¬ÓÃÀ´×öά¶È±í£¬½øÐÐ temporal table join¡£Òò´Ë£¬ÎÒÃÇÐèÒªÔÚ HBase
Öд´½¨Ò»ÕÅ±í£¬Í¬Ê±»¹ÐèÒª´´½¨ Flink HBase table, ÕâÁ½Õűíͨ¹ý Flink SQL
µÄ HBase connector ¹ØÁªÆðÀ´¡£
ÔÚÈÝÆ÷ÖÐÆô¶¯ HBase shell£¬´´½¨Ò»ÕÅÃûΪ dim_hbase
µÄ HBase ±í£¬½¨±íÓï¾äÈçÏÂËùʾ£º
# ÔÚhbase shell´´½¨
hbase±í
hbase(main):002:0> create 'dim_hbase','cf'
Created table dim_hbase
Took 1.3120 seconds
=> Hbase::Table - dim_hbase |
ÔÚ Flink Öд´½¨ Flink HBase table£¬½¨±íÓï¾äÈçÏÂËùʾ£º
# ×¢²á Flink Hbase
table
DROP TABLE IF EXISTS flink_rtdw.demo.hbase_dim_table;
CREATE TABLE flink_rtdw.demo.hbase_dim_table (
rowkey STRING,
cf ROW < adspace_name STRING >,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'dim_hbase',
'sink.buffer-flush.max-rows' = '1000',
'zookeeper.quorum' = 'localhost:2181'
); |
Flink MySQL table ºÍ Flink HBase table
ÒѾ´´½¨ºÃÁË£¬¾Í¿ÉÒÔд³éÈ¡Êý¾Ýµ½HBase µÄ SQL job ÁË£¬SQL Óï¾äÒÔ¼° job ״̬ÈçÏÂËùʾ£º
# ³éÈ¡MysqlÊý¾Ýµ½Hbase±íÖÐ
insert into
hbase_dim_table
select
CAST (ID as VARCHAR),
ROW(name)
from
mysql_dim_table; |
03 HBase ×÷Ϊά±íÓë Kafka ×ö temporal join µÄ³¡¾°
ÔÚ Flink SQL join ÖУ¬Î¬¶È±íµÄ join Ò»¶¨ÈƲ»¿ªµÄ£¬±ÈÈç¶©µ¥½ð¶î join »ãÂÊ±í£¬µã»÷Á÷
join ¹ã¸æÎ»µÄÃ÷ϸ±íµÈµÈ£¬Ê¹Óó¡¾°·Ç³£¹ã·º¡£ÄÇô×÷Ϊ·Ö²¼Ê½Êý¾Ý¿âµÄ HBase ±È MySQL
×÷Ϊά¶È±íÓÃ×÷ά¶È±í join ¸üÓÐÓÅÊÆ¡£ÔÚ Flink SQL ʵսϵÁеڶþƪÖУ¬ÎÒÃÇ×¢²áÁË¹ã¸æµÄµã»÷Á÷£¬½«
Kafka topic ×¢²á Flink Kafka Table£¬Í¬Ê±Ò²½éÉÜÁË temporal table
join ÔÚ Flink SQL ÖеÄʹÓã»ÄÇô±¾½ÚÖн«»á½éÉÜ HBase ×÷Ϊά¶È±íÀ´Ê¹Óã¬ÉÏÃæÐ¡½ÚÖÐÒѾ½«Êý¾Ý³éÈ¡µ½
Hbase ÖÐÁË£¬ÎÒÃÇÖ±½Óд temporal table join ¼ÆËãÂß¼¼´¿É¡£
×÷Ϊ¹ã¸æµã»÷Á÷µÄ Flink Kafa table Óë ×÷Ϊ¹ã¸æÎ»µÄ
Flink HBase table ͨ¹ý¹ã¸æÎ» Id ½øÐÐ temporal table join£¬Êä³ö¹ã¸æÎ»
ID ºÍ¹ã¸æÎ»ÖÐÎÄÃû×Ö£¬SQL join Âß¼ÈçÏÂËùʾ£º
select adsdw_dwd_max_click_mobileapp.publisher_
adspace _ adspaceId as publisher_adspace_adspaceId,
hbase_dim_table.cf.adspace_name as publisher_adspace_name
from adsdw_dwd_max_click_mobileapp
left join hbase_dim_table FOR SYSTEM_TIME AS OF
adsdw_dwd_max_click_mobileapp.procTime
on cast(adsdw_dwd_max_click_mobileapp.publisher_
adspace _ adspaceId as string) = hbase_dim_table.rowkey;
|
temporal table join job Ìá½» Flink
¼¯ÈºÉϵÄ״̬ÒÔ¼° join ½á¹ûÈçÏÂËùʾ£º
ËÄ¡¢¼ÆËã½á¹û sink µ½ HBase ×÷Ϊ½á¹ûµÄ³¡¾°
ÉÏÃæÐ¡½ÚÖУ¬HBase ×÷Ϊά¶È±íÓÃ×÷ temporal table join ÊǷdz£³£¼ûµÄ³¡¾°£¬Êµ¼ÊÉÏ
HBase ×÷Ϊ´æ´¢¼ÆËã½á¹ûÒ²ÊǷdz£³£¼ûµÄ³¡¾°£¬±Ï¾¹ Hbase ×÷Ϊ·Ö²¼Ê½Êý¾Ý¿â£¬µ×²ã´æ´¢ÊÇÓµÓжั±¾»úÖÆµÄ
HDFS£¬Î¬»¤¼òµ¥£¬À©ÈÝ·½±ã£¬ ʵʱ²éѯ¿ì£¬¶øÇÒÌṩ¸÷ÖÖ¿Í»§¶Ë·½±ãÏÂÓÎʹÓô洢ÔÚ HBase ÖеÄÊý¾Ý¡£ÄÇô±¾Ð¡½Ú¾Í½éÉÜ
Flink SQL ½«¼ÆËã½á¹ûдµ½ HBase£¬²¢ÇÒͨ¹ý REST API ²éѯ¼ÆËã½á¹ûµÄ³¡¾°¡£
½øÈëÈÝÆ÷ÖУ¬ÔÚ HBase ÖÐн¨Ò»ÕÅ HBase ±í£¬Ò»¸ö column
family ¾ÍÂú×ãÐèÇ󣬽¨±íÓï¾äÈçÏÂËùʾ£º
# ×¢²áhbase sink
table
create 'dwa_hbase_click_report','cf' |
½¨Á¢ºÃ HBase ±íÖ®ºó£¬ÎÒÃÇÐèÒªÔÚ Flink SQL ´´½¨Ò»ÕÅ
Flink HBase table£¬Õâ¸öʱºòÎÒÃÇÐèÒªÃ÷È· cf Õâ¸ö column famaly ÏÂÃæ
column ×ֶΣ¬ÔÚ Flink SQLʵսµÚ¶þƪÖУ¬ÒѾע²áºÃÁË×÷Ϊµã»÷Á÷µÄ Flink Kafka
table£¬Òò´Ë±¾½ÚÖУ¬½«»á¼ÆËãµã»÷Á÷µÄ uv ºÍµã»÷Êý£¬Òò´ËÁ½¸ö column ·Ö±ðΪ uv ºÍ
click_count£¬½¨±íÓï¾äÈçÏÂËùʾ£º
# ×¢²á Flink Hbase
table
DROP TABLE IF EXISTS flink_rtdw.demo.dwa_hbase_click_report;
CREATE TABLE flink_rtdw.demo.dwa_hbase_click_report
(
rowkey STRING,
cf ROW < uv BIGINT, click_count BIGINT >,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'dwa_hbase_click_report',
'sink.buffer-flush.max-rows' = '1000',
'zookeeper.quorum' = 'hostname:2181'
); |
Ç°Ãæµã»÷Á÷µÄ Flink Kafka table ºÍ´æ´¢¼ÆËã½á¹ûµÄ
HBase table ºÍ Flink HBase table ÒѾ׼±¸ÁË£¬ÎÒÃǽ«×öÒ»¸ö1·ÖÖӵķת´°¿Ú¼ÆËã
uv ºÍµã»÷Êý£¬²¢ÇÒ½«¼ÆËã½á¹ûдµ½ HBase ÖС£¶Ô HBase Á˽âµÄÈËÓ¦¸ÃÖªµÀ£¬rowkey
µÄÉè¼Æ¶Ô hbase regoin µÄ·Ö²¼ÓÐ×ŷdz£ÖØÒªµÄÓ°Ï죬»ùÓÚ´ËÎÒÃÇµÄ rowkey ÊÇʹÓà Flink
SQL ÄÚÖÃµÄ reverse º¯Êý½øÐÐ¹ã¸æÎ» Id ½øÐз´×ªºÍ´°¿ÚÆôʼʱ¼ä×ö concat£¬Òò´Ë£¬SQL
Âß¼Óï¾äÈçÏÂËùʾ£º
INSERT INTO dwa_hbase_click_report
SELECT
CONCAT(REVERSE(CAST(publisher_adspace_adspaceId
AS STRING)) ,
'_',
CAST((UNIX_TIMESTAMP(DATE_FORMAT(TUMBLE_START(ets,
INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss')) *
1000) AS STRING)
) as rowkey,
ROW(COUNT(DISTINCT audience_mvid) , COUNT(audience_behavior_click_creative_impressionId))
as cf
FROM
adsdw_dwd_max_click_mobileapp
WHERE publisher_adspace_adspaceId IS NOT NULL
AND audience _ mvid IS NOT NULL AND audience_behavior_click_creative_impressionId
IS NOT NULL
GROUP BY
TUMBLE(ets, INTERVAL '1' MINUTE),
publisher_adspace_adspaceId; |
SQL job Ìá½»Ö®ºóµÄ״̬ÒÔ¼°½á¹û check ÈçÏÂËùʾ£º
ÉÏÊö SQL job ÒѾ³É¹¦µÄ½«½áËã½á¹ûдµ½ HBase ÖÐÁË¡£¶ÔÓÚÏßÉ쵀 HBase ·þÎñÀ´½²£¬ºÜ¶àͬʲ»Ò»¶¨ÓÐ
HBase ¿Í»§¶ËµÄȨÏÞ£¬´Ó¶øÒ²²»ÄÜͨ¹ý HBase shell ¶ÁÈ¡Êý¾Ý£»ÁíÍâ×÷ΪÏßÉϱ¨±í·þÎñÏÔÈ»²»¿ÉÄÜͨ¹ý
HBase shell À´Í¨¹ý²éѯÊý¾Ý¡£Òò´Ë£¬ÔÚʵʱ±¨±í³¡¾°ÖУ¬Êý¾Ý¿ª·¢¹¤³Ìʦ½«Êý¾ÝдÈë HBase,
ǰ¶Ë¹¤³Ìʦͨ¹ý REST API À´¶ÁÈ¡Êý¾Ý¡£Ç°ÃæÎÒÃÇÒѾÆô¶¯ÁË HBase rest server
½ø³Ì£¬ÎÒÃÇ¿ÉÒÔͨ rest ·þÎñÌṩ¶ÁÈ¡ HBase ÀïÃæµÄÊý¾Ý¡£
ÎÒÃÇÏÈ get Ò»Ìõ¸Õ¸Õдµ½ HBase ÖеÄÊý¾Ý¿´¿´£¬ÈçÏÂËùʾ£º
ÏÂÃæÎÒÃÇ¿ªÊ¼Í¨¹ý REST API À´²éѯ HBase ÖеÄÊý¾Ý£¬µÚÒ»²½£¬Ö´ÐÐÈçÏÂÓï¾äÄõ½
scannerId£»Ê×ÏÈÐèÒª½«Òª²éѯµÄ rowkey ½øÐÐ base64 ±àÂë²ÅÄÜʹÓ㬺óÃæÐèÒª½«½á¹û½øÐÐ
base64 ½âÂë
curl -vi -X PUT
\
-H "Accept: text/xml" \
-H "Content-Type: text/xml" \
-d '<Scanner startRow="MDEyMjYxMl8xNjA2Mjk1MjgwMDAw"
endRow="MDEyMjYxMl8xNjA2Mjk1MjgwMDAw"></Scanner>'
\
"http://hostname:8000/dwa_hbase_click_report/scanner"
|
µÚ¶þ²½£¬Ö´ÐÐÈçÏÂÓï¾ä¸ù¾ÝÉÏÌõÓï¾ä·µ»ØµÄ scannerID ²éѯÊý¾Ý£¬¿ÉÒÔ¿´µ½·µ»ØµÄ½á¹û£º
curl -vi -X GET
\
-H "Accept: application/json" \
"http://hostname:8000/dwa_hbase_click_report/
scanner /16063768141736ac0a8b5" |
µÚÈý²½£¬²éѯÍê±ÏÖ®ºó£¬Ö´ÐÐÈçÏÂÓï¾äɾ³ý¸ÃscannerId:
curl -vi -X DELETE
\
-H "Accept: text/xml" \
"http://hostname:8000/dwa_hbase_click_ report/scanner/16063768141736ac0a8b5"
|
Îå¡¢×ܽá
ÔÚ±¾ÆªÎÄÕÂÖУ¬ÎÒÃǽéÉÜÁË HBase ºÍ Flink SQL µÄ½áºÏʹÓñȽϹ㷺Á½Öֵij¡¾°£º×÷Ϊά¶È±íÓÃÒÔ¼°´æ´¢¼ÆËã½á¹û£»Í¬Ê±Ê¹ÓÃ
REST API ¶Ô HBase ÖеÄÊý¾Ý½øÐвéѯ£¬¶ÔÓÚ²éѯÓû§À´Ëµ£¬±ÜÃâÖ±½Ó±©Â¶ HBase µÄ
zk£¬Í¬Ê±½« rest server ºÍ HBase ¼¯Èº½âñî¡£ |