Ëæ×Ų¢·¢Á¿¡¢Êý¾ÝÁ¿Ô½À´Ô½´ó¼°ÒµÎñÒѾϸ»¯µ½²»ÄÜÔÙ°´ÕÕÒµÎñ»®·Ö£¬ÎÒÃDz»µÃ²»Ê¹Ó÷ֲ¼Ê½Êý¾Ý¿âÌá¸ßϵͳµÄÐÔÄÜ¡£ÔÚ·Ö²¼Ê½ÏµÍ³ÖУ¬¸÷¸ö½ÚµãÔÚÎïÀíÉ϶¼ÊÇÏà¶Ô¶ÀÁ¢µÄ£¬Ã¿¸ö½ÚµãÉϵÄÊý¾Ý²Ù×÷¶¼¿ÉÒÔÂú×ã
ACID¡£µ«ÊÇ£¬¸÷¶ÀÁ¢½ÚµãÖ®¼äÎÞ·¨ÖªµÀÆäËû½ÚµãÊÂÎñµÄÖ´ÐÐÇé¿ö£¬Èç¹ûÏëÈöą̀»úÆ÷ÖеÄÊý¾Ý±£´æÒ»Ö£¬¾Í±ØÐë±£Ö¤ËùÓнڵãÉϵÄÊý¾Ý²Ù×÷Ҫôȫ²¿Ö´Ðгɹ¦£¬ÒªÃ´È«²¿²»Ö´ÐУ¬±È½Ï³£¹æµÄ½â¾ö·½·¨ÊÇÒýÈ롰е÷Õß¡±À´Í³Ò»µ÷¶ÈËùÓнڵãµÄÖ´ÐС£
XA ¹æ·¶ ¡¡
X/Open ×éÖ¯£¨¼´ÏÖÔÚµÄ Open Group£©¶¨ÒåÁË·Ö²¼Ê½ÊÂÎñ´¦ÀíÄ£ÐÍ¡£X/Open DTP Ä£ÐÍ£¨1994£©°üÀ¨Ó¦ÓóÌÐò£¨AP£©¡¢ÊÂÎñ¹ÜÀíÆ÷£¨TM£©¡¢×ÊÔ´¹ÜÀíÆ÷£¨RM£©¡¢Í¨ÐÅ×ÊÔ´¹ÜÀíÆ÷£¨CRM£©ËIJ¿·Ö¡£ÊÂÎñ¹ÜÀíÆ÷£¨TM£©Êǽ»Ò×Öмä¼þ£¬×ÊÔ´¹ÜÀíÆ÷£¨RM£©ÊÇÊý¾Ý¿â£¬Í¨ÐÅ×ÊÔ´¹ÜÀíÆ÷£¨CRM£©ÊÇÏûÏ¢Öмä¼þ¡£Í¨³£°ÑÒ»¸öÊý¾Ý¿âÄÚ²¿µÄÊÂÎñ´¦Àí¿´×÷±¾µØÊÂÎñ£¬¶ø·Ö²¼Ê½ÊÂÎñ´¦ÀíµÄ¶ÔÏóÊÇÈ«¾ÖÊÂÎñ¡£È«¾ÖÊÂÎñÊÇÖ¸ÔÚ·Ö²¼Ê½ÊÂÎñ´¦Àí»·¾³ÖУ¬¶à¸öÊý¾Ý¿â¿ÉÄÜÐèÒª¹²Í¬Íê³ÉÒ»¸ö¹¤×÷£¬Õâ¸ö¹¤×÷¾ÍÊÇÒ»¸öÈ«¾ÖÊÂÎñ¡£ÔÚÒ»¸öÊÂÎñÖпÉÄܸüм¸¸ö²»Í¬µÄÊý¾Ý¿â£¬´Ëʱһ¸öÊý¾Ý¿â¶Ô×Ô¼ºÄÚ²¿Ëù×ö²Ù×÷µÄÌá½»²»½öÐèÒª±¾ÉíµÄ²Ù×÷³É¹¦£¬»¹ÐèҪȫ¾ÖÊÂÎñÏà¹ØµÄÆäËûÊý¾Ý¿âµÄ²Ù×÷³É¹¦¡£Èç¹ûÈÎÒ»Êý¾Ý¿âµÄÈÎÒ»²Ù×÷ʧ°Ü£¬Ôò²ÎÓë´ËÊÂÎñµÄËùÓÐÊý¾Ý¿âËù×öµÄËùÓвÙ×÷¶¼±ØÐë»Ø¹ö¡£XA¾ÍÊÇX/Open
DTP ¶¨ÒåµÄ½»Ò×Öмä¼þÓëÊý¾Ý¿âÖ®¼äµÄ½Ó¿Ú¹æ·¶£¨¼´½Ó¿Úº¯Êý£©£¬½»Ò×Öмä¼þÓÃËüÀ´Í¨ÖªÊý¾Ý¿âÊÂÎñµÄ¿ªÊ¼¡¢½áÊø¡¢Ìá½»¡¢»Ø¹öµÈ£¬XA
½Ó¿Úº¯ÊýÓÉÊý¾Ý¿â³§ÉÌÌṩ£¬¸ù¾ÝÕâһ˼ÏëÑÜÉú³ö¶þ½×¶ÎÌá½»ÐÒéºÍÈý½×¶ÎÌá½»ÐÒé¡£
¶þ½×¶ÎÌá½» ¡¡
ËùνµÄÁ½¸ö½×¶ÎÊÇָ׼±¸½×¶ÎºÍÌá½»½×¶Î¡£
×¼±¸½×¶ÎÖ¸ÊÂÎñе÷Õߣ¨ÊÂÎñ¹ÜÀíÆ÷£©Ïòÿ¸ö²ÎÓëÕߣ¨×ÊÔ´¹ÜÀíÆ÷£©·¢ËÍ×¼±¸ÏûÏ¢£¬Ã¿¸ö²ÎÓëÕßҪôֱ½Ó·µ»ØÊ§°ÜÏûÏ¢£¨ÈçȨÏÞÑé֤ʧ°Ü£©£¬ÒªÃ´ÔÚ±¾µØÖ´ÐÐÊÂÎñ£¬Ð´±¾µØµÄ
redo ºÍundoÈÕÖ¾µ«²»Ìá½»£¬¿ÉÒÔ½øÒ»²½½«×¼±¸½×¶Î·ÖΪÒÔÏÂÈý²½¡£
£¨1£©Ðµ÷Õß½ÚµãÏòËùÓвÎÓëÕß½ÚµãѯÎÊÊÇ·ñ¿ÉÒÔÖ´ÐÐÌá½»²Ù×÷£¨vote£©£¬²¢¿ªÊ¼µÈ´ý¸÷²ÎÓëÕß½ÚµãµÄÏìÓ¦¡£
£¨2£©²ÎÓëÕß½ÚµãÖ´ÐÐѯÎÊ·¢ÆðΪֹµÄËùÓÐÊÂÎñ²Ù×÷£¬²¢½« undo ÐÅÏ¢ºÍ redo ÐÅϢдÈëÈÕÖ¾¡£
£¨3£©¸÷²ÎÓëÕß½ÚµãÏìӦе÷Õ߽ڵ㷢ÆðµÄѯÎÊ¡£Èç¹û²ÎÓëÕß½ÚµãµÄÊÂÎñ²Ù×÷ʵ¼ÊÖ´Ðгɹ¦£¬ÔòËü·µ»ØÒ»¸ö¡°Í¬Ò⡱ÏûÏ¢£»Èç¹û²ÎÓëÕß½ÚµãµÄÊÂÎñ²Ù×÷ʵ¼ÊÖ´ÐÐʧ°Ü£¬ÔòËü·µ»ØÒ»¸ö¡°ÖÐÖ¹¡±ÏûÏ¢¡£
Ìá½»½×¶ÎÖ¸Èç¹ûе÷ÕßÊÕµ½Á˲ÎÓëÕßµÄʧ°ÜÏûÏ¢»òÕß³¬Ê±£¬ÔòÖ±½ÓÏòÿ¸ö²ÎÓëÕß·¢Ëͻعö£¨Rollback£©ÏûÏ¢£¬·ñÔò·¢ËÍÌá½»£¨Commit£©ÏûÏ¢£¬²ÎÓëÕ߸ù¾Ýе÷ÕßµÄÖ¸ÁîÖ´ÐÐÌá½»»òÕ߻عö²Ù×÷£¬ÊÍ·ÅËùÓÐÊÂÎñÔÚ´¦Àí¹ý³ÌÖÐʹÓõÄËø×ÊÔ´¡£
¶þ½×¶ÎÌá½»Ëù´æÔÚµÄȱµãÈçÏ¡£
£¨1£©Í¬²½×èÈûÎÊÌ⣬ÔÚÖ´Ðйý³ÌÖÐËùÓвÎÓë½Úµã¶¼ÊÇÊÂÎñ×èÈûÐ͵쬵±²ÎÓëÕßÕ¼Óù«¹²×ÊԴʱ£¬ÆäËûµÚÈý·½½Úµã·ÃÎʹ«¹²×ÊԴʱ²»µÃ²»´¦ÓÚ×èÈû״̬¡£
£¨2£©µ¥µã¹ÊÕÏ£¬ÓÉÓÚе÷ÕßµÄÖØÒªÐÔ£¬Ò»µ©Ðµ÷Õß·¢Éú¹ÊÕÏ£¬Ôò²ÎÓëÕß»áÒ»Ö±×èÈûÏÂÈ¥¡£
£¨3£©Êý¾Ý²»Ò»Ö£¬ÔÚ¶þ½×¶ÎÌá½»µÄµÚ 2 ¸ö½×¶ÎÖУ¬µ±Ðµ÷ÕßÏò²ÎÓëÕß·¢ËÍ commit ÇëÇóÖ®ºó·¢ÉúÁ˾ֲ¿ÍøÂçÒì³£»òÕßÔÚ·¢ËÍ
commit ÇëÇóµÄ¹ý³ÌÖÐе÷Õß·¢ÉúÁ˹ÊÕÏ£¬Ôò»áµ¼ÖÂÖ»ÓÐÒ»²¿·Ö²ÎÓëÕß½ÓÊÕµ½ÁË commit ÇëÇ󣬶øÔÚÕⲿ·Ö²ÎÓëÕßÔÚ½ÓÊÕµ½
commit ÇëÇóÖ®ºó¾Í»áÖ´ÐÐcommit²Ù×÷£¬ÆäËû²¿·Öδ½ÓÊÕµ½ commit ÇëÇóµÄ»úÆ÷ÔòÎÞ·¨Ö´ÐÐÊÂÎñÌá½»£¬ÓÚÊÇÕû¸ö·Ö²¼Ê½ÏµÍ³±ã³öÏÖÁËÊý¾Ý²»Ò»ÖµÄÏÖÏó¡£
ÓÉÓÚ¶þ½×¶ÎÌá½»´æÔÚÖîÈçͬ²½×èÈû¡¢µ¥µãÎÊÌâ¡¢Êý¾Ý²»Ò»Ö¡¢å´»úµÈȱÏÝ£¬ËùÒÔ£¬Ñо¿ÕßÃÇÔÚ¶þ½×¶ÎÌá½»µÄ»ù´¡ÉÏ×öÁ˸Ľø£¬Ìá³öÁËÈý½×¶ÎÌá½»¡£
Èý½×¶ÎÌá½» ¡¡
Èý½×¶ÎÌá½»£¨Three-phase commit£¬3PC£©£¬Ò²½Ð×÷Èý½×¶ÎÌá½»ÐÒ飨Three-phase
commitprotocol£©£¬ÊǶþ½×¶ÎÌá½»£¨2PC£©µÄ¸Ä½ø°æ±¾¡£Èý½×¶ÎÌá½»°Ñ¶þ½×¶ÎÌá½»µÄ×¼±¸½×¶ÎÔÙ´ÎÒ»·ÖΪ¶þ£¬ÕâÑùÈý½×¶ÎÌá½»¾ÍÓÐ
CanCommit¡¢PreCommit¡¢DoCommit Èý¸ö½×¶Î¡£
£¨1£©CanCommit ½×¶Î£ºÈý½×¶ÎÌá½»µÄ CanCommit ½×¶ÎÆäʵºÍ¶þ½×¶ÎÌá½»µÄ×¼±¸½×¶ÎºÜÏñ£¬Ðµ÷ÕßÏò²ÎÓëÕß·¢ËÍ
commit ÇëÇ󣬲ÎÓëÕßÈç¹û¿ÉÒÔÌá½»¾Í·µ»Ø Yes ÏìÓ¦£¬·ñÔò·µ»Ø No ÏìÓ¦¡£
£¨2£©PreCommit ½×¶Î£ºÐµ÷Õ߸ù¾Ý²ÎÓëÕߵķ´Ó¦Çé¿öÀ´¾ö¶¨ÊÇ·ñ¿ÉÒԼǼÊÂÎñµÄ PreCommit²Ù×÷¡£¸ù¾ÝÏìÓ¦Çé¿ö£¬ÓÐÒÔÏÂÁ½ÖÖ¿ÉÄÜ¡£
1.¼ÙÈçе÷Õß´ÓËùÓвÎÓëÕßÄÇÀï»ñµÃµÄ·´À¡¶¼ÊÇ Yes ÏìÓ¦£¬ÔòÖ´ÐÐÊÂÎñ¡£
2.¼ÙÈçÓÐÈκÎÒ»¸ö²ÎÓëÕßÏòе÷Õß·¢ËÍÁË No ÏìÓ¦£¬»òÕߵȴý³¬Ê±Ö®ºóе÷Õß¶¼Ã»Óнӵ½²ÎÓëÕßµÄÏìÓ¦£¬ÔòÖ´ÐÐÊÂÎñµÄÖжϡ£
£¨3£©DoCommit½×¶Î£º¸Ã½×¶Î½øÐÐÕæÕýµÄÊÂÎñÌá½»£¬Ò²¿ÉÒÔ·ÖΪִÐÐÌá½»¡¢ÖжÏÊÂÎñÁ½ÖÖÖ´ÐÐÇé¿ö¡£
Ö´ÐÐÌá½»µÄ¹ý³ÌÈçÏ¡£
е÷Õß½ÓÊÕµ½²ÎÓëÕß·¢Ë͵ÄACKÏìÓ¦ºó£¬½«´ÓÔ¤Ìύ״̬½øÈëÌύ״̬£¬²¢ÏòËùÓвÎÓëÕß·¢ËÍdoCommitÇëÇó¡£
ÊÂÎñÌá½»²ÎÓëÕß½ÓÊÕµ½doCommitÇëÇóÖ®ºó£¬Ö´ÐÐÕýʽµÄÊÂÎñÌá½»£¬²¢ÔÚÍê³ÉÊÂÎñÌá½»Ö®ºóÊÍ·ÅËùÓеÄÊÂÎñ×ÊÔ´¡£
ÊÂÎñÌá½»ÍêÖ®ºó£¬Ïòе÷Õß·¢ËÍACKÏìÓ¦¡£
е÷Õß½ÓÊÕµ½ËùÓвÎÓëÕßµÄACKÏìÓ¦Ö®ºó£¬Íê³ÉÊÂÎñ¡£ÖжÏÊÂÎñµÄ¹ý³ÌÈçÏÂ
е÷ÕßÏòËùÓвÎÓëÕß·¢ËÍabortÇëÇó¡£
²ÎÓëÕß½ÓÊÕµ½ abort ÇëÇóÖ®ºó£¬ÀûÓÃÆäÔÚµÚ 2 ¸ö½×¶Î¼Ç¼µÄ undo ÐÅÏ¢À´Ö´ÐÐÊÂÎñµÄ»Ø¹ö²Ù×÷£¬²¢ÔÚÍê³É»Ø¹öÖ®ºóÊÍ·ÅËùÓеÄÊÂÎñ×ÊÔ´¡£
²ÎÓëÕßÍê³ÉÊÂÎñ»Ø¹öÖ®ºó£¬Ïòе÷Õß·¢ËÍ ACK ÏûÏ¢¡£
е÷Õß½ÓÊÕµ½²ÎÓëÕß·´À¡µÄ ACK ÏûÏ¢Ö®ºó£¬Ö´ÐÐÊÂÎñµÄÖжÏ
Mycat Öзֲ¼Ê½ÊÂÎñµÄʵÏÖ ¡¡
MycatÔÚ1.6°æ±¾ÒÔºóÒѾÍêȫ֧³Ö XA ·Ö²¼Ê½Ç¿ÊÂÎñÀàÐÍÁË£¬ÏÈͨ¹ýÒ»¸ö¼òµ¥µÄʾÀýÀ´Á˽âMycatÖÐXAµÄÓ÷¨¡£
Óû§Ó¦ÓòࣨAP£©µÄʹÓÃÁ÷³ÌÈçÏ£º
£¨1£©set autocommit=0
ÔÚÓ¦ÓòãÐèÒªÉèÖÃÊÂÎñ²»ÄÜ×Ô¶¯Ìá½»£»
£¨2£©set xa=on
ÔÚ SQL ÖÐÉèÖà XA Ϊ¿ªÆô״̬£»
£¨3£©Ö´ÐÐ SQL
insert into travelrecord(id,name) values(1,¡¯N¡¯),(6000000,¡¯A¡¯),(321,¡¯D¡¯),(13400000,¡¯C¡¯),(59,¡¯E¡¯);
£¨4£©commit »òÕß rollback
¶ÔÊÂÎñ½øÐÐÌá½»£¨Ìá½»³É¹¦»òÕ߻عöÒì³££©¡£ ÍêÕûµÄÁ÷³ÌͼÈçͼËùʾ¡£

Mycat ÄÚ²¿ÊµÏÖ²àµÄʵÏÖÁ÷³ÌÈçÏ£º
£¨1£©set autocommit=0
½« MysqlConnection ÖÐµÄ autocommit ÉèÖÃΪ false£»
£¨2£©set xa=on
ÔÚMycatÖпªÆô XA ÊÂÎñ¹ÜÀíÆ÷£¬Óà MycatServer.getInstance().genXATXID()Éú³É
XID£¬ÓÃXA START XID ÃüÁî½øÐÐ XA ÊÂÎñ¿ªÊ¼±ê¼Ç£¬¼ÌÐøÆ´×° SQL ÒµÎñ£¨Mycat »á½«ÉÏÃæµÄ
insert Êý¾Ý·ÖƬµ½²»Í¬µÄ½ÚµãÉÏ£©£¬Æ´×° XA END XID£¬XA PREPARE XID ×îºó½øÐÐ
1pc Ìá½»²¢¼Ç¼ÈÕÖ¾µ½ tm.log ÖУ¬Èç¹û 1pc ½×¶ÎÓÐÒì³££¬ÔòÖ±½Ó»Ø¹öÊÂÎñ XA ROLLBACK
xid¡£
£¨3£©ÔÚ¶à½Úµã MySQL ÖÐÈ«²¿½øÐÐ 2pc Ìá½»£¨XA COMMIT£©£¬Ìá½»³É¹¦ºó£¬ÊÂÎñ½áÊø£»Èç¹ûÓÐÒì³££¬Ôò¶ÔÊÂÎñ½øÐÐÖØÐÂÌá½»»òÕ߻عö¡£
Mycat ÖÐµÄ XA ·Ö²¼Ê½ÊÂÎñµÄÒì³£´¦ÀíÁ÷³ÌÈçÏ£º
£¨1£©Ò»½×¶Î commit Òì³££ºÈç¹û 1pc Ìá½»ÈÎÒâÒ»¸ö mysql ½ÚµãÎÞ·¨Ìá½»»òÕßÒì³££¬ÔòÈ«²¿½ÚµãµÄÊÂÎñ½øÐлعö£¬Å׳öÒì³£¸øÓ¦ÓòàÊÂÎñ»Ø¹ö¡£
£¨2£©Mycat Crash Recovery Mycat ±ÀÀ£ÒԺ󣬸ù¾Ý tm.log ÊÂÎñÈÕÖ¾ÔÙ½øÐÐÖØÆô»Ö¸´£¬mycat
Æô¶¯ºóÖ´ÐÐÊÂÎñÈÕÖ¾²éÕÒ¸÷¸ö½ÚµãÖÐÒѾ prepared µÄ XA ÊÂÎñ£¬½øÐÐ commit »òÕß rollback¡£
1. Ïà¹ØÀà˵Ã÷ ¡¡
ͨ¹ýÓû§Ó¦Óò෢ËÍ set xa = on ; SQL ¿ªÆô Mycat ÄÚ²¿ XA ÊÂÎñ¹ÜÀíÆ÷µÄ¹¦ÄÜ£¬ÊÂÎñ¹ÜÀíÆ÷½«¶Ô
MySQL Êý¾Ý¿â½øÐÐ XA ·½Ê½µÄÊÂÎñ¹ÜÀí£¬¾ßÌåÊÂÎñ¹ÜÀí¹¦ÄܵÄʵÏÖ´úÂëÈçÏ£º
MySQLConnection£ºÊý¾Ý¿âÁ¬½Ó¡£ NonBlockingSession£ºÓû§Á¬½Ó Session¡£ MultiNodeCoordinator£ºÐµ÷Õß¡£ CommitNodeHandler£º·ÖƬÌá½»´¦Àí¡£ RollbackNodeHandler£º·ÖƬ»Ø¹ö´¦Àí¡£ |
2. ´úÂë½âÎö ¡¡¡¡
XA ÊÂÎñÆô¶¯µÄÔ´ÂëÈçÏ£º
public class MySQLConnection extends BackendAIOConnection { //ÉèÖÿªÆôÊÂÎñ private void getAutocommitCommand(StringBuilder sb, boolean autoCommit) { if (autoCommit) { sb.append("SET autocommit=1;"); } else { sb.append("SET autocommit=0;"); } } public void execute(RouteResultsetNode rrn, ServerConnection sc,boolean autocommit) throws UnsupportedEncodingException { if(!modifiedSQLExecuted && rrn.isModifySQL()) { modifiedSQLExecuted = true; } //»ñÈ¡µ±Ç°ÊÂÎñ ID String xaTXID = sc.getSession2().getXaTXID(); synAndDoExecute(xaTXID, rrn, sc.getCharsetIndex(), sc.getTxIsolation(),autocommit); } ¡¡ ¡¡//Ê¡ÂÔ´Ë´¦´úÂ룬½¨Òé¶ÁÕ߲ο¼ GitHub ²Ö¿âµÄ MyCAT-Server ÏîÄ¿µÄ MySQLConnection.javaÔ´Âë } |
Óû§Ó¦ÓòàÉèÖÃÊÖ¶¯Ìá½»ÒÔºó£¬Mycat »áÔÚµ±Ç°Á¬½ÓÖмÓÈë
½«¸ÃÓï¾ä¼ÓÈëµ½ StringBuffer ÖУ¬µÈ´ýÌá½»µ½Êý¾Ý¿â¡£ Óû§Á¬½Ó Session µÄÔ´ÂëÈçÏ£º
public class NonBlockingSession implements Session { ¡¡ ¡¡//Ê¡ÂÔ´Ë´¦´úÂ룬½¨Òé¶ÁÕ߲ο¼ GitHub ²Ö¿âµÄ MyCAT-Server ÏîÄ¿µÄ NonBlockingSession.java Ô´Âë } |
SET XA = ON ;Óï¾ä·ÖÎö ¡¡
Óû§Ó¦Óò෢Ë͸ÃÓï¾äµ½ Mycat ÖУ¬ÓÉ SQL Óï¾ä½âÎöÆ÷½âÎöºó½»ÓÉ SetHandle ½øÐд¦Àíc.getSession2().setXATXEnabled
(true);
µ÷Óà NonBlockSession ÖÐµÄ setXATXEnable d ·½·¨ÉèÖà XA ¿ª¹ØÆô¶¯£¬²¢Éú³É
XID,´úÂëÈçÏ£º
public void setXATXEnabled(boolean xaTXEnabled) { LOGGER.info("XA Transaction enabled ,con " + this.getSource()); if (xaTXEnabled && this.xaTXID == null) { xaTXID = genXATXID(); } } |
ÁíÍ⣬NonBlockSession »á½ÓÊÕÀ´×ÔÓÚÓû§Ó¦ÓòàµÄ commit, µ÷Óà commit ·½·¨½øÐд¦ÀíÊÂÎñÌá½»µÄÂß¼¡£
ÔÚ commit()·½·¨ÖУ¬Ê×ÏÈ»á check ½Úµã¸öÊý£¬Ò»¸ö½ÚµãºÍ¶à¸ö½Úµã·ÖΪ²»Í¬µÄ´¦Àí¹ý³Ì£¬ÕâÀïÖ»½²Ï¶à¸ö½ÚµãµÄ´¦Àí·½·¨
checkDistriTransaxAndExecute(); ¸Ã·½·¨»á¶Ô¶à¸ö½ÚµãµÄÊÂÎñ½øÐÐÌá½»¡£ е÷ÕßµÄÔ´ÂëÈçÏ£º
public class MultiNodeCoordinator implements ResponseHandler { ¡¡ ¡¡//Ê¡ÂÔ´Ë´¦´úÂ룬½¨Òé¶ÁÕ߲ο¼ GitHub ²Ö¿â MyCAT-Server ÏîÄ¿µÄ MultiNodeCoordinator.java Ô´Âë } |
ÔÚ NonBlockSession µÄ checkDistriTransaxAndExecute()·½·¨ÖУ¬
NonBlockSession »á»°Àà»áµ÷ÓÃרÃŽøÐжà½ÚµãÐͬµÄ MultiNodeCoordinator
Àà½øÐоßÌåµÄ´¦Àí£¬ÔÚ MultiNodeCoordinatorÀàÖУ¬executeBatchNodeCmd
·½·¨¼ÓÈë XA 1PC Ìá½»µÄ´¦Àí£¬´úÂëÆ¬¶ÎÈçÏ£º
for (RouteResultsetNode rrn : session.getTargetKeys()) { ¡¡ if (mysqlCon.getXaStatus() == TxState.TX_STARTED_STATE){ //recovery Log participantLogEntry[started] = new ParticipantLogEntry(xaTxId,conn.getHost(),0,conn.getSchema(),((MySQLConnection) conn).getXaStatus()); String[] cmds = new String[]{"XA END " + xaTxId,"XA PREPARE " + xaTxId}; if (LOGGER.isDebugEnabled()) { LOGGER.debug("Start execute the batch cmd : "+ cmds[0] + ";" +cmds[1]+","+"current
connection:"+conn.getHost()+":"+conn.getPort()); } mysqlCon.execBatchCmd(cmds); } ¡¡ } |
¡¡¡¡ÔÚ MultiNodeCoordinator ÀàµÄ okResponse ·½·¨ÖУ¬Ôò½øÐÐ 2pc
µÄÊÂÎñÌá½»
MySQLConnection mysqlCon = (MySQLConnection) conn; switch (mysqlCon.getXaStatus()){ case TxState.TX_STARTED_STATE: if (mysqlCon.batchCmdFinished()){ String xaTxId = session.getXaTXID(); String cmd = "XA COMMIT " + xaTxId; if (LOGGER.isDebugEnabled()) { LOGGER.debug("Start execute the cmd :"+cmd+",current host:"+mysqlCon.getHost()+":"+mysqlCon.getPort()); } //recovery log CoordinatorLogEntry coordinatorLogEntry =inMemoryRepository.get(xaTxId); for(int i=0; i<coordinatorLogEntry.participants.length;i++){ LOGGER.debug("[In MemoryCoordinatorLogEntry]"+coordinatorLogEntry.participants[i]); if(coordinatorLogEntry.participants[i].resourceName.equals(conn.getSchema())){ coordinatorLogEntry.participants[i].txState =TxState.TX_PREPARED_STATE; } } inMemoryRepository.put(session.getXaTXID(),coordinatorLogEntry); fileRepository.writeCheckpoint(inMemoryRepository.getAllCoordinatorLogEntries()); //send commit mysqlCon.setXaStatus(TxState.TX_PREPARED_STATE); mysqlCon.execCmd(cmd); } return; ¡¡ } |
¡¡¡¡·ÖƬÊÂÎñÌá½»´¦ÀíµÄÔ´ÂëÈçÏ£º
public class CommitNodeHandler implements ResponseHandler { //½áÊø XA public void commit(BackendConnection conn) { ¡¡ ¡¡//Ê¡ÂÔ´Ë´¦´úÂ룬½¨Òé¶ÁÕ߲ο¼ GitHub ²Ö¿â MyCAT-Server ÏîÄ¿µÄ CommitNodeHandler.javaÔ´Âë } //Ìá½» XA @Override public void okResponse(byte[] ok, BackendConnection conn) { ¡¡ ¡¡//Ê¡ÂÔ´Ë´¦´úÂ룬½¨Òé¶ÁÕ߲ο¼ GitHub ²Ö¿âµÄ MyCAT-Server ÏîÄ¿µÄ CommitNodeHandler.java Ô´Âë } |
ÔÚ Mycat ÖÐͬÑùÖ§³Öµ¥½Úµã MySQL Êý¾Ý¿âµÄ XA ÊÂÎñ´¦Àí£¬ÔÚ CommitNodeHandler
ÀàÖоÍÊǶԵ¥½ÚµãµÄ XA ¶þ½×¶Î´¦Àí£¬´¦Àí·½Ê½Óë MultiNodeCoordinator Ààͬ£¬Í¨¹ý
commit ·½·¨½øÐÐ 1pc µÄÌá½»£¬¶øÍ¨¹ý okResponse µÄ·½·¨½øÐÐ 2pc ½×¶ÎµÄÊÂÎñÌá½»¡£
·ÖƬÊÂÎñ»Ø¹ö´¦ÀíµÄÔ´ÂëÈçÏ£º
public class RollbackNodeHandler extends MultiNodeHandler { ¡¡ ¡¡//Ê¡ÂÔ´Ë´¦´úÂ룬½¨Òé¶ÁÕ߲ο¼ GitHub ²Ö¿âµÄ MyCAT-Server ÏîÄ¿µÄ RollbackNodeHandler.java Ô´Âë } |
ÔÚ RollbackNodeHandler µÄ rollback ·½·¨ÖмÓÈëÁË¶Ô XA ÊÂÎñµÄ rollback
´¦Àí£¬Óû§Ó¦Óò෢ÆðµÄ rollback »áÔÚÕâ¸ö·½·¨ÖнøÐд¦Àí¡£
for (final RouteResultsetNode node : session.getTargetKeys()) { ¡¡ //support the XA rollback MySQLConnection mysqlCon = (MySQLConnection) conn; if(session.getXaTXID()!=null) { String xaTxId = session.getXaTXID(); mysqlCon.execCmd("XA END " + xaTxId + ";"); mysqlCon.execCmd("XA ROLLBACK " + xaTxId + ";"); }else { conn.rollback(); } ¡¡ } |
¡¡¡¡Í¬Ñù£¬¸Ã·½·¨»á¶ÔËùÓÐµÄ MySQL Êý¾Ý¿â½Úµã·¢Æð xa rollback Ö¸Áî¡£
|