ʵʱÊý¾Ý¸´ÖƼ¼ÊõÔÚÒøÐС¢µçÐÅ¡¢±£ÏÕ¡¢ÕþÎñºÍµçÉ̵ÈÁìÓòÓ¦Ó÷dz£¹ã·º¡£

±ÈÈçÒøÐÐÁìÓòµÄÊÕµ¥ÒµÎñÉæ¼°ÊÕµ¥ÐС¢ÒøÐп¨×éÖ¯¼°·¢¿¨ÐеÄÊý¾Ýͬ²½¡£ÊÕµ¥ÐеÄÊý¾ÝÐèÒª´«Êäµ½ÒøÐп¨×éÖ¯£¬ÔÙÓÉÒøÐп¨×éÖ¯´«Ê䏸·¢¿¨ÐС£
Èç¹ûÊÕµ¥ÒµÎñ²»ÄÜ×öµ½Õ⼸µã£¬Ôò»á³öÏÖ½¨ÉèÒøÐеÄPOS»úÖ»ÄÜË¢½¨ÉèÒøÐеĿ¨£¬ÕÐÉÌÒøÐеÄPOS»úÖ»ÄÜË¢ÕÐÉÌÒøÐеĿ¨µÄÇé¿ö¡£»òÕßÓû§Ë¢¿¨Íê±Ïºó£¬É̼ÒÐèÒªµç»°Ñ¯ÎÊÊÕµ¥ÐÐÓû§ÊÇ·ñÓÐ×ã¹»Óà¶î£¬ÊÕµ¥ÐÐÔÙȥѯÎÊÒøÐп¨×éÖ¯£¬ÒøÐп¨×éÖ¯ÔÙȥѯÎÊ·¢¿¨ÐС£ÕâÑùÐèÒªÏûºÄ´óÁ¿É豸¡¢ÈËÁ¦ºÍʱ¼ä£¬Ôì³ÉÓû§ÌåÑéϽµ£¬´ìÉËÓû§µÄÓÿ¨»ý¼«ÐÔ¡£
±ÈÈçСÆû³µÎ¥Õ´¦ÀíÒµÎñÐèÒªÔÚÈ«³µ¸÷µØ³µ¹ÜËùͬ²½Î¥Õ¼Ç¼ºÍ´¦ÀíÇé¿öµÈÊý¾Ý£¬±ãÓÚ¼ÝÊ»ÈËÔÚÈ«¹ú¸÷µØ´¦Àí¡£ÕâЩҵÎñ³¡¾°¾ùÐèÒªÊý¾Ý¼°Ê±¡¢¿É¿¿ºÍÎȶ¨µÄ½øÐд«ÊäºÍ½»»»¡£
Èç¹ûСÆû³µÎ¥Õ´¦ÀíÒµÎñ²»ÄÜ×öµ½Õ⼸µã£¬Ôò¼ÝÊ»ÈËÖ»ÄÜȥΥÕµػò³µÁ¾µÇ¼ÇµØ´¦Àí£¬¶ÔÓÚÒìµØÎ¥Õ´¦Àí³É±¾¼«¸ß¡£µçÉÌϵͳÖеÄÓû§·ÖÎöÒµÎñÐèÒª´Ó´óÁ¿µÄ·Ö±íÖÐÌáÈ¡Óû§Êý¾Ý£¬Í¬²½µ½´óÊý¾Ýƽ̨½øÐзÖÎö£¬ÆäÒµÎñÁ¿·Ç³£´ó£¬´«Í³µÄÊý¾Ý³éÈ¡¶ÔÒµÎñϵͳӰÏ켫´ó¡£
ÔÚÒÔÉϵÄÒµÎñ³¡¾°ÖУ¬Êý¾Ý½»»»Éæ¼°ÒìµØ¡¢ÒìÖʺÍÒì¹¹µÈÒòËØÓ°Ïì¡£ÒìµØÖ¸Êý¾Ý¿âÍùÍùÃÇÓÚ²»Í¬µÄÊý¾ÝÖÐÐÄ£¬Éæ¼°¿É¿¿ÐÔ¡¢°²È«ÐÔºÍÎȶ¨ÐÔµÄÓ°Ïì¡£ÒìÖÊÖ¸²»Í¬µÄÊý¾Ý¿âÀàÐÍ£¬Éæ¼°µ×²ã´æ´¢ºÍ´¦Àí¼¼Êõ²îÒìµÄÓ°Ï죻Òì¹¹Ö¸²»Í¬µÄÊý¾Ý¿â½á¹¹£¬Éæ¼°Êý¾ÝÖØ¹¹µÄÓ°Ï죬¶øÇҺܶàÐèÇó¶¼ÊÇÔÚϵͳÉÏÏߺó²úÉúµÄ£¬ºÜ¶àÔÒò¾ö¶¨²»ÄܶÔÓ¦ÓÃϵͳ½øÐдóÁ¿Öع¹¡£
Òò´Ë£¬ÊµÊ±Êý¾Ý½»»»µÄÀ§Äѷdz£´ó£¬ÄÇô£¬ÎÒÃÇÓ¦¸ÃÈçºÎ½â¾öÄØ£¿
Ò».¡°Êý¾ÝËíµÀ¡±
ÎÒÃÇÔÚʵ¼ùÖУ¬ÓÃÁËеÄ˼·À´½â¾öÕâЩÎÊÌâ¡£°Ù·Öµã¡°Êý¾ÝËíµÀ¡±¾ÍÏñʯÓ͹ܵÀÒ»Ñù£¬Ê¯ÓÍÔ´Ô´²»¶ÏµØ´ÓÓ;®Á÷Ïò¸÷µØ£¬¶ø²»ÓÃʹÓÃÂÖ´¬¡¢»ð³µºÍÆû³µµÈ½øÐÐÔËÊä¡£

¡°Êý¾ÝËíµÀ¡±¼¼ÊõµÄÄ¿±êÊÇͨ¹ý¼òµ¥ÅäÖü´¿ÉÍê³ÉMySQL¡¢Oracle¡¢SQL
ServerºÍDB2µÈÊý¾Ý¿âµÄÔöÁ¿Êý¾Ý²¶»ñÈÎÎñ£¬´«Ê䏸ÍⲿϵͳÍê³ÉÊý¾ÝµÄʵʱºÍÅúÁ¿Ïû·Ñ£¬Âú×㼫¸ßµÄÒ»ÖÂÐÔ¡¢¼°Ê±ÐԺͿɿ¿ÐÔµÄÊý¾Ý½Ó¿Ú´«ÊäÒªÇó¡£

¡°Êý¾ÝËíµÀ¡±ÕûÌå¼Ü¹¹Í¼ ¾ßÌåÀ´Ëµ£¬Ä¿Ç°°Ù·Öµã¡°Êý¾ÝËíµÀ¡±Ö§³ÖMySQL¡¢OracleºÍSQL
ServerµÄÈÕÖ¾ÌáÈ¡£¬ÊµÊ±¸´ÖƵ½ÍⲿÊý¾Ý¿â¡¢KafkaºÍHiveµÈϵͳ¡£¶ÔÓÚºó¶ËÊý¾ÝÏû·Ñ£¬°Ù·Öµã¡°Êý¾ÝËíµÀ¡±¿ÉÒÔÌṩʵʱӦÓá¢ÊµÊ±Á÷´¦Àí¡¢ÉÁ»Ø²éѯ¡¢¿ìÕÕ±í´¦ÀíºÍÀÁ´±í´¦ÀíµÄÖ§³Ö£¬ÎªÊµÊ±Êý¾Ý¸´ÖÆ¡¢ÊµÊ±Êý¾Ý´¦Àí¡¢ODSºÍÊý¾Ý²Ö¿âÌṩ±ãÀû¡£ºóÐø½«Í¨¹ýÒ»¸öÓû§°¸ÀýÀ´½éÉÜ¡°Êý¾ÝËíµÀ¡±ÈçºÎÂú×ãÓû§µÄÊý¾Ý½»»»ÐèÇó¡£

¶Ô±È´«Í³×ö·¨£¬¡°Êý¾ÝËíµÀ¡±¾ßÓеÄϵͳÓÅÊÆÌåÏÖÔÚÒÔϼ¸¸ö·½Ã棺
Êý¾Ý½ÓÈëÎÞÐ迪·¢£¬´ó´óËõ¼õÏîÄ¿¹¤Ê±ºÍ·çÏÕ
Êý¾Ý´«ÊäµÄÒ»ÖÂÐÔ¡¢¼°Ê±ÐԺͿɿ¿ÐÔ¼«Ç¿
ͨ¹ýÊý¾Ý¿âÈÕÖ¾ÌáÈ¡±ä¸ü£¬ÇÖÈëÐÔС£¬¿ªÏú¼«µÍ
Ö§³ÖMySQL¡¢OracleºÍSQL ServerÖ÷Á÷°æ±¾µÄÊý¾Ý½ÓÈë
Ö§³Öʵʱ¸´ÖƵ½KafkaºÍHiveϵͳ£¬·½±ãʵʱºÍÅúÁ¿Êý¾ÝÏû·Ñ
Ö§³Ö×Ô¶¨Òå²å¼þʵÏÖµ½ÈÎÒ⻺´æ¡¢ÏûÏ¢¶ÓÁкÍÊý¾Ý¿âµÄ¸´ÖÆ
Ö§³Ö×Ô¶¨Ò庯ÊýʵÏÖÊý¾ÝµÄÈÎÒâÖØ¹¹
×Ô¶¯»¯ÊµÏÖÀúÊ·Êý¾ÝÇ¨ÒÆ
×Ô¶¯»¯ÊµÏÖÄ¿±ê¿â±í½á¹¹Éú³É
Ö§³Ö¿â¼¶¡¢±í¼¶ºÍÐм¶²¢Ðи´ÖÆ
Ö§³ÖMySQL DDLÉó¼Æ
ÊÊÓÃÓÚODS¡¢DWºÍʵʱÊý¾ÝÏû·ÑµÈ³¡¾°
Ö§³ÖHiveÉÁ»Ø²éѯ¡¢¿ìÕÕ±í´¦ÀíºÍÀÁ´±í´¦Àí

¡°Êý¾ÝËíµÀ¡±Pipeline¼Ü¹¹Í¼
¶þ.Óû§°¸Àý
Ä³ÒøÐÐÓµÓÐ1ÒÚÓû§£¬Óû§Êý¾ÝÓÉ·Ö²¼Ê½¹ØÏµÊý¾Ý¿âÀ´³ÐÔØ£¬²ÉÓÿâ·Ö±í·½Ê½½øÐйÜÀí¡£¸ÃÒøÐÐÆÚÍû¶ÔÓû§Êý¾Ý½øÐзÖÎö£¬ÒÔÍÆ½øÆóÒµµÄ¾«Ï¸»¯¹ÜÀíˮƽ£¬Í¬Ê±¶ÔÕû¸öÊý¾Ý·ÖÎöÏîÄ¿ÓÐÈçÏÂÒªÇó£º
1. Óû§Êý¾ÝÁ¿±È½Ï´ó£¬Êý¾Ý²É¼¯²»ÄܶÔÒµÎñϵͳÐÔÄܲúÉúÓ°Ïì¡£
2. Êý¾Ý¿â²ÉÓ÷ֿâ·Ö±í·½Ê½½øÐд洢£¬Êý¾Ý²É¼¯Òª¿¼Âǵ½Êý¾Ý·ÖƬµÄÓ°Ï죬²¢ÇÒÊý¾Ý·ÖƬ¼äÐèÒª±£Ö¤Ç¿Ò»ÖÂÐÔ£¬·ÀÖ¹³öÏÖ·ÖÎöÊý¾ÝÒì³£¡£
3. Êý¾Ý²É¼¯±ØÐë¼æÈÝ´óÊý¾Ýƽ̨£¬¾¡¿ÉÄܱÜÃâÊÖ¹¤±àÂë¡£
4. Óû§ÆÚÍûʵʱ´¦ÀíºÍÅúÁ¿´¦ÀíÏà½áºÏ£¬Âú×㲻ͬµÄÒµÎñ³¡¾°¡£
¸ÃÒøÐеÄÓû§ÓÉ×ÔÖ÷Ñз¢µÄCRMϵͳÀ´¹ÜÀí¡£ÐÂÓû§×¢²áÔòÐÂÔöÓû§±í¼Ç¼£¬Óû§×ÊÁϱä¸üÔò¸üÐÂÓû§±íÏà¹Ø¼Ç¼£¬Óû§×¢ÏúÔòɾ³ýÓû§±íÏà¹Ø¼Ç¼¡£
Óû§ÊµÊ±·ÖÎö
ʵʱ·ÖÎö½ØÖ¹ÖÁ½ñÈÕ¡¢Ô³õµÄÐÂÔö¡¢¾»ÔöºÍÀÛ¼ÆÓû§Êý£¬ÆäÖÐÐÂÔöÓû§ÊýΪ¸ÃϵͳÐÂ×¢²áÓû§£¬¾»ÔöÓû§ÊýΪÐÂÔöÓû§ÊýÓë×¢ÏúÓû§Êý²îÖµ£¬ÀÛ¼ÆÓû§ÊýΪϵͳµ±Ç°ËùÓÐÓû§Êý¡£¸Ã°¸ÀýÖÐÐÂÔöÓû§ÊýÓÉINSERTÐÐÊý¼ÆË㣬עÏúÓû§ÊýÓÉDELETEÐÐÊý¼ÆË㣬¾»ÔöÓû§ÊýΪÐÂÔöÓû§Êý¼õ×¢ÏúÓû§Êý£¬ÀÛ¼ÆÓû§ÊýÔòÓÉÉÏÒ»ÈÕÀÛ¼ÆÓû§Êý¼Ó¾»ÔöÓû§ÊýµÃ³ö¡£
Time : Ô´Êý¾Ý¿â·¢Éú¸ÃʼþµÄʱ¼ä£¬¼´CommitµÄʱ¼ä¡£
Latency : Ô´Êý¾Ý¿â±ä¸üµ½Ìá½»ÈëKafkaµÄʱ¼äÑÓ³Ù¡£
Seqno : ϵͳ¶ÔÔ´Êý¾Ý¿âÊÂÎñµÄ˳Ðò±àºÅ¡£
EventID : ϵͳ¶ÔÔ´Êý¾Ý¿âÊÂÎñµÄʼþÆðʼλ±àºÅ
Row : ijÊÂÎñÄÚÓ°ÏìµÄÐеÄ˳Ðò±àºÅ£¬ÈçSeqnoΪ1µÄÊÂÎñÖÐÓÐÒ»±ÊInsert²åÈë1ÌõÊý¾Ý£¬Ò»±ÊUpdate¸üе½2ÌõÊý¾Ý£¬×îºóÒ»±ÊDeleteɾ³ýµ½3ÌõÊý¾Ý£¬ÔòRow´Ó1µ½6·Ö±ð´ú±íÕâ6Ðеıä¸üÐòºÅ¡£
Action : INSERT¡¢UPDATE¡¢DELETE¡¢CREATE_TABLE¡¢DROP_TABLEµÈ£¬ÆäÖÐINSERT¡¢UPDATEºÍDELETEËùÓÐÊý¾Ý¿â¾ùÊÊÓã¬ÆäËüÀàActionÔòÓëÏà¹ØÊý¾Ý¿âÓйأ¬OracleÔÝʱûÓÐÆäËüÀàAction¡£
BeforeºÍAfter : ¶ÔÓÚDELETEʼþ£¬Before¿éÌî³äÊý¾Ý£¬ÎÞAfter¿é¡£¶ÔÓÚINSERTʼþ£¬ÎÞBefore¿é£¬After¿éÌî³äÊý¾Ý¡£¶ÔÓÚUPDATEʼþ£¬Before¿éÌî³ä¸üÐÂǰµÄÖµ£¬After¿éÌî³ä¸üкóµÄÖµ¡£
INSERT¿é
{ "Time": "2014-10-10 13:50:52.0", "Latency": 0, "Seqno": 1, "EventID": "my:46460573", "Row": 1, "Schema": "CRM", "Table": "USER", "Action": "INSERT", "After": { "ID": "1", "Name": "Jim", "Birthday": "1988-10-09 19:48:10.656", "Company": "Baifendian", "Money": "8000" } } |
UPDATE¿é
{ "Time": "2015-10-10 13:50:52.0", "Latency": 0, "Seqno": 2, "EventID": "my:56460573", "Row": 1, "Schema": "CRM", "Table": "USER", "Action": "UPDATE", "Before": { "ID": "1", "Name": "Jim", "Birthday": "1988-10-09 19:48:10.656", "Company": "Baifendian", "Money": "8000" }, "After": { "ID": "1", "Name": "Jim", "Birthday": "2015-10-09 19:48:10.656", "Company": "Baifendian", "Money": "10000" } } |
ʵʱ¸´ÖÆ
Kafka ApplierÅäÖãº
# Kafka applier configuration. replicator.applier.dbms=com.continuent.tungsten.replicator.applier.KafkaApplier replicator.applier.dbms.dataSource=global replicator.applier.dbms.zkConnect=172.24.4.18:2171/kafka replicator.applier.dbms.kafkaBroker=192.24.4.10:9092,192.24.4.11:9092,192.24.4.12:9092 replicator.applier.dbms.kafkaSerializer=kafka.serializer.StringEncoder # kafkaTopicPrefix value can be Baifendian.Input. which is categoried by organization unit or system or both. replicator.applier.dbms.kafkaTopicPrefix=cdc. # kafkaTopicRotationInterval value can be 1d(Rotate every day),
6h(Rotate every 6 hours),10m(Rotate every 10 minutes) or empty(Never rotate) replicator.applier.dbms.kafkaTopicRotationInterval=1d |
Á÷ʽ´¦Àí
°Ù·Öµã¡°Êý¾ÝËíµÀ¡±Ö±½Ó½«Êý¾Ý¿â±ä¸üÊý¾Ý¸´ÖƵ½Kafka£¬Storm»òFlink»ùÓÚKafkaÄڵıä¸üÊý¾Ýʵʱ¼ÆËãÐÂÔöÓû§¡¢¾»ÔöÓû§¡¢×¢ÏúÓû§ºÍÀÛ¼ÆÓû§Êý¡£
Óû§ÀëÏß·ÖÎö
·ÖÎöÿÈÕÓû§ÐÂÔö¡¢¾»ÔöºÍÀÛ¼ÆÓû§Êý£¬ÆäÖÐÐÂÔöÓû§ÊýΪ¸ÃϵͳÐÂ×¢²áÓû§£¬¾»ÔöÓû§ÊýΪÐÂÔöÓû§ÊýÓë×¢ÏúÓû§Êý²îÖµ£¬ÀÛ¼ÆÓû§ÊýΪϵͳµ±Ç°ËùÓÐÓû§Êý¡£¸Ã°¸ÀýÖÐÐÂÔöÓû§ÊýÓÉINSERTÐÐÊý¼ÆË㣬עÏúÓû§ÊýÓÉDELETEÐÐÊý¼ÆË㣬¾»ÔöÓû§ÊýΪÐÂÔöÓû§Êý¼õ×¢ÏúÓû§Êý£¬ÀÛ¼ÆÓû§ÊýÔòÓÉÉÏÒ»ÈÕÀÛ¼ÆÓû§Êý¼Ó¾»ÔöÓû§ÊýµÃ³ö¡£
ʵʱ¸´ÖÆ
°Ù·Öµã¡°Êý¾ÝËíµÀ¡±¿ÉÒÔ¸ù¾Ýʼþʱ¼ä°´Hive·ÖÇø±í·½Ê½ÊµÊ±¸´ÖÆÊý¾Ý¡£HdfsApplierÒÔ¶à¸öHDFSÎļþÁ÷µÄ·½Ê½½«±ä¸üÊý¾Ý²¢ÐÐдÈëHDFSÎļþϵͳ¡£HiveʹÓÃÍⲿ±í¶Ôµ×²ãÊý¾Ý½øÐвéѯ¡£
HDFS ApplierÅäÖãº
# HDFS applier configuration. replicator.applier.dbms=com.continuent.tungsten.replicator.applier.HdfsApplier replicator.applier.dbms.dataSource=global replicator.applier.dbms.hdfsConfURI= replicator.applier.dbms.hdfsURI=hdfs://hostname:8020 replicator.applier.dbms.hdfsPrefix=/user/hive/warehouse replicator.applier.dbms.hdfsUser=hdfs replicator.applier.dbms.hdfsSyncInterval=10 replicator.applier.dbms.hdfsRotationInterval=1d replicator.applier.dbms.hdfsRotationTimezone=8 replicator.applier.dbms.maxFDs=20 replicator.applier.dbms.hdfsSchemaPrefix= replicator.applier.dbms.hdfsTablePrefix=chg_ replicator.applier.dbms.fieldDelimiter=\\001 replicator.applier.dbms.lineDelimiter= |
ÀëÏß¼ÆËã
ͨ¹ýÒÔϲéѯ¿ÉÒÔ»ñµÃÓû§±íÈÎÒâʱµãµÄ¿ìÕÕ¡£»ùÓÚ´ËÔÀí£¬°Ù·Öµã¡°Êý¾ÝËíµÀ¡±¿ÉÒÔÖ§³ÖÉÁ»Ø²éѯ¡¢Êý¾Ý¿ìÕÕ¡¢Êý¾ÝÀÁ´µÈÓ¦Óá£ÓÉÓÚ¡°Êý¾ÝËíµÀ¡±¿É¾«È·²¶»ñÊý¾ÝµÄ²Ù×÷ʱ¼äºÍ²Ù×÷ÀàÐÍ£¬Òò´ËÐÂÔöÓû§¡¢¾»ÔöÓû§¡¢×¢ÏúÓû§ºÍÀÛ¼ÆÓû§ÊýµÄ¼ÆËãÒ²±äµÃ·Ç³£¼òµ¥¡£
select * from ( select row_number() over(partition by userid order by seqno desc, row desc) as R,* from USER where time <= '2016-01-01 00:00:00') snapshot where R= 1 and action <> 'D'; |
Èý.×ܽá
°Ù·Öµã¡°Êý¾ÝËíµÀ¡±Äܹ»ÊµÊ±ÌáÈ¡MySQL¡¢OracleºÍSQL ServerµÄÊý¾Ý±ä¸ü£¬²¢ÇÒ¿ÉÒÔ¶Ô±ä¸üÊý¾Ý½øÐÐʵʱת»»£¬ÔÙÓ¦ÓÃÓÚÊý¾Ý¿âϵͳ¡¢´óÊý¾Ýϵͳ¡¢ÏûϢϵͳ¡¢»º´æÏµÍ³ºÍʵʱ¼ÆË㣬Âú×ãÒøÐС¢µçÐÅ¡¢±£ÏÕ¡¢ÕþÎñºÍµçÉ̵ÈÁìÓòµÄÊý¾Ý½»»»ÐèÇó¡£ |