14
14
15
15
#include "postgres.h"
16
16
17
+ #include "access/commit_ts.h"
17
18
#include "access/genam.h"
18
19
#include "access/gist.h"
19
20
#include "access/relscan.h"
20
21
#include "access/tableam.h"
21
22
#include "access/transam.h"
22
23
#include "access/xact.h"
24
+ #include "access/heapam.h"
23
25
#include "catalog/pg_am_d.h"
24
26
#include "commands/trigger.h"
25
27
#include "executor/executor.h"
36
38
37
39
38
40
static bool tuples_equal (TupleTableSlot * slot1 , TupleTableSlot * slot2 ,
39
- TypeCacheEntry * * eq );
41
+ TypeCacheEntry * * eq , Bitmapset * columns );
40
42
41
43
/*
42
44
* Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
@@ -221,7 +223,7 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
221
223
if (eq == NULL )
222
224
eq = palloc0 (sizeof (* eq ) * outslot -> tts_tupleDescriptor -> natts );
223
225
224
- if (!tuples_equal (outslot , searchslot , eq ))
226
+ if (!tuples_equal (outslot , searchslot , eq , NULL ))
225
227
continue ;
226
228
}
227
229
@@ -277,10 +279,13 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
277
279
278
280
/*
279
281
* Compare the tuples in the slots by checking if they have equal values.
282
+ *
283
+ * If 'columns' is not null, only the columns specified within it will be
284
+ * considered for the equality check, ignoring all other columns.
280
285
*/
281
286
static bool
282
287
tuples_equal (TupleTableSlot * slot1 , TupleTableSlot * slot2 ,
283
- TypeCacheEntry * * eq )
288
+ TypeCacheEntry * * eq , Bitmapset * columns )
284
289
{
285
290
int attrnum ;
286
291
@@ -305,6 +310,14 @@ tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
305
310
if (att -> attisdropped || att -> attgenerated )
306
311
continue ;
307
312
313
+ /*
314
+ * Ignore columns that are not listed for checking.
315
+ */
316
+ if (columns &&
317
+ !bms_is_member (att -> attnum - FirstLowInvalidHeapAttributeNumber ,
318
+ columns ))
319
+ continue ;
320
+
308
321
/*
309
322
* If one value is NULL and other is not, then they are certainly not
310
323
* equal
@@ -380,7 +393,7 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
380
393
/* Try to find the tuple */
381
394
while (table_scan_getnextslot (scan , ForwardScanDirection , scanslot ))
382
395
{
383
- if (!tuples_equal (scanslot , searchslot , eq ))
396
+ if (!tuples_equal (scanslot , searchslot , eq , NULL ))
384
397
continue ;
385
398
386
399
found = true;
@@ -455,6 +468,236 @@ BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex)
455
468
}
456
469
}
457
470
471
+ /*
472
+ * If the tuple is recently dead and was deleted by a transaction with a newer
473
+ * commit timestamp than previously recorded, update the associated transaction
474
+ * ID, commit time, and origin. This helps ensure that conflict detection uses
475
+ * the most recent and relevant deletion metadata.
476
+ */
477
+ static void
478
+ update_most_recent_deletion_info (TupleTableSlot * scanslot ,
479
+ TransactionId oldestxmin ,
480
+ TransactionId * delete_xid ,
481
+ TimestampTz * delete_time ,
482
+ RepOriginId * delete_origin )
483
+ {
484
+ BufferHeapTupleTableSlot * hslot ;
485
+ HeapTuple tuple ;
486
+ Buffer buf ;
487
+ bool recently_dead = false;
488
+ TransactionId xmax ;
489
+ TimestampTz localts ;
490
+ RepOriginId localorigin ;
491
+
492
+ hslot = (BufferHeapTupleTableSlot * ) scanslot ;
493
+
494
+ tuple = ExecFetchSlotHeapTuple (scanslot , false, NULL );
495
+ buf = hslot -> buffer ;
496
+
497
+ LockBuffer (buf , BUFFER_LOCK_SHARE );
498
+
499
+ /*
500
+ * We do not consider HEAPTUPLE_DEAD status because it indicates either
501
+ * tuples whose inserting transaction was aborted (meaning there is no
502
+ * commit timestamp or origin), or tuples deleted by a transaction older
503
+ * than oldestxmin, making it safe to ignore them during conflict
504
+ * detection (See comments atop worker.c for details).
505
+ */
506
+ if (HeapTupleSatisfiesVacuum (tuple , oldestxmin , buf ) == HEAPTUPLE_RECENTLY_DEAD )
507
+ recently_dead = true;
508
+
509
+ LockBuffer (buf , BUFFER_LOCK_UNLOCK );
510
+
511
+ if (!recently_dead )
512
+ return ;
513
+
514
+ xmax = HeapTupleHeaderGetUpdateXid (tuple -> t_data );
515
+ if (!TransactionIdIsValid (xmax ))
516
+ return ;
517
+
518
+ /* Select the dead tuple with the most recent commit timestamp */
519
+ if (TransactionIdGetCommitTsData (xmax , & localts , & localorigin ) &&
520
+ TimestampDifferenceExceeds (* delete_time , localts , 0 ))
521
+ {
522
+ * delete_xid = xmax ;
523
+ * delete_time = localts ;
524
+ * delete_origin = localorigin ;
525
+ }
526
+ }
527
+
528
+ /*
529
+ * Searches the relation 'rel' for the most recently deleted tuple that matches
530
+ * the values in 'searchslot' and is not yet removable by VACUUM. The function
531
+ * returns the transaction ID, origin, and commit timestamp of the transaction
532
+ * that deleted this tuple.
533
+ *
534
+ * 'oldestxmin' acts as a cutoff transaction ID. Tuples deleted by transactions
535
+ * with IDs >= 'oldestxmin' are considered recently dead and are eligible for
536
+ * conflict detection.
537
+ *
538
+ * Instead of stopping at the first match, we scan all matching dead tuples to
539
+ * identify most recent deletion. This is crucial because only the latest
540
+ * deletion is relevant for resolving conflicts.
541
+ *
542
+ * For example, consider a scenario on the subscriber where a row is deleted,
543
+ * re-inserted, and then deleted again only on the subscriber:
544
+ *
545
+ * - (pk, 1) - deleted at 9:00,
546
+ * - (pk, 1) - deleted at 9:02,
547
+ *
548
+ * Now, a remote update arrives: (pk, 1) -> (pk, 2), timestamped at 9:01.
549
+ *
550
+ * If we mistakenly return the older deletion (9:00), the system may wrongly
551
+ * apply the remote update using a last-update-wins strategy. Instead, we must
552
+ * recognize the more recent deletion at 9:02 and skip the update. See
553
+ * comments atop worker.c for details. Note, as of now, conflict resolution
554
+ * is not implemented. Consequently, the system may incorrectly report the
555
+ * older tuple as the conflicted one, leading to misleading results.
556
+ *
557
+ * The commit timestamp of the deleting transaction is used to determine which
558
+ * tuple was deleted most recently.
559
+ */
560
+ bool
561
+ RelationFindDeletedTupleInfoSeq (Relation rel , TupleTableSlot * searchslot ,
562
+ TransactionId oldestxmin ,
563
+ TransactionId * delete_xid ,
564
+ RepOriginId * delete_origin ,
565
+ TimestampTz * delete_time )
566
+ {
567
+ TupleTableSlot * scanslot ;
568
+ TableScanDesc scan ;
569
+ TypeCacheEntry * * eq ;
570
+ Bitmapset * indexbitmap ;
571
+ TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr (rel );
572
+
573
+ Assert (equalTupleDescs (desc , searchslot -> tts_tupleDescriptor ));
574
+
575
+ * delete_xid = InvalidTransactionId ;
576
+ * delete_origin = InvalidRepOriginId ;
577
+ * delete_time = 0 ;
578
+
579
+ /*
580
+ * If the relation has a replica identity key or a primary key that is
581
+ * unusable for locating deleted tuples (see
582
+ * IsIndexUsableForFindingDeletedTuple), a full table scan becomes
583
+ * necessary. In such cases, comparing the entire tuple is not required,
584
+ * since the remote tuple might not include all column values. Instead,
585
+ * the indexed columns alone are suffcient to identify the target tuple
586
+ * (see logicalrep_rel_mark_updatable).
587
+ */
588
+ indexbitmap = RelationGetIndexAttrBitmap (rel ,
589
+ INDEX_ATTR_BITMAP_IDENTITY_KEY );
590
+
591
+ /* fallback to PK if no replica identity */
592
+ if (!indexbitmap )
593
+ indexbitmap = RelationGetIndexAttrBitmap (rel ,
594
+ INDEX_ATTR_BITMAP_PRIMARY_KEY );
595
+
596
+ eq = palloc0 (sizeof (* eq ) * searchslot -> tts_tupleDescriptor -> natts );
597
+
598
+ /*
599
+ * Start a heap scan using SnapshotAny to identify dead tuples that are
600
+ * not visible under a standard MVCC snapshot. Tuples from transactions
601
+ * not yet committed or those just committed prior to the scan are
602
+ * excluded in update_most_recent_deletion_info().
603
+ */
604
+ scan = table_beginscan (rel , SnapshotAny , 0 , NULL );
605
+ scanslot = table_slot_create (rel , NULL );
606
+
607
+ table_rescan (scan , NULL );
608
+
609
+ /* Try to find the tuple */
610
+ while (table_scan_getnextslot (scan , ForwardScanDirection , scanslot ))
611
+ {
612
+ if (!tuples_equal (scanslot , searchslot , eq , indexbitmap ))
613
+ continue ;
614
+
615
+ update_most_recent_deletion_info (scanslot , oldestxmin , delete_xid ,
616
+ delete_time , delete_origin );
617
+ }
618
+
619
+ table_endscan (scan );
620
+ ExecDropSingleTupleTableSlot (scanslot );
621
+
622
+ return * delete_time != 0 ;
623
+ }
624
+
625
+ /*
626
+ * Similar to RelationFindDeletedTupleInfoSeq() but using index scan to locate
627
+ * the deleted tuple.
628
+ */
629
+ bool
630
+ RelationFindDeletedTupleInfoByIndex (Relation rel , Oid idxoid ,
631
+ TupleTableSlot * searchslot ,
632
+ TransactionId oldestxmin ,
633
+ TransactionId * delete_xid ,
634
+ RepOriginId * delete_origin ,
635
+ TimestampTz * delete_time )
636
+ {
637
+ Relation idxrel ;
638
+ ScanKeyData skey [INDEX_MAX_KEYS ];
639
+ int skey_attoff ;
640
+ IndexScanDesc scan ;
641
+ TupleTableSlot * scanslot ;
642
+ TypeCacheEntry * * eq = NULL ;
643
+ bool isIdxSafeToSkipDuplicates ;
644
+ TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr (rel );
645
+
646
+ Assert (equalTupleDescs (desc , searchslot -> tts_tupleDescriptor ));
647
+ Assert (OidIsValid (idxoid ));
648
+
649
+ * delete_xid = InvalidTransactionId ;
650
+ * delete_time = 0 ;
651
+ * delete_origin = InvalidRepOriginId ;
652
+
653
+ isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK (rel ) == idxoid );
654
+
655
+ scanslot = table_slot_create (rel , NULL );
656
+
657
+ idxrel = index_open (idxoid , RowExclusiveLock );
658
+
659
+ /* Build scan key. */
660
+ skey_attoff = build_replindex_scan_key (skey , rel , idxrel , searchslot );
661
+
662
+ /*
663
+ * Start an index scan using SnapshotAny to identify dead tuples that are
664
+ * not visible under a standard MVCC snapshot. Tuples from transactions
665
+ * not yet committed or those just committed prior to the scan are
666
+ * excluded in update_most_recent_deletion_info().
667
+ */
668
+ scan = index_beginscan (rel , idxrel , SnapshotAny , NULL , skey_attoff , 0 );
669
+
670
+ index_rescan (scan , skey , skey_attoff , NULL , 0 );
671
+
672
+ /* Try to find the tuple */
673
+ while (index_getnext_slot (scan , ForwardScanDirection , scanslot ))
674
+ {
675
+ /*
676
+ * Avoid expensive equality check if the index is primary key or
677
+ * replica identity index.
678
+ */
679
+ if (!isIdxSafeToSkipDuplicates )
680
+ {
681
+ if (eq == NULL )
682
+ eq = palloc0 (sizeof (* eq ) * scanslot -> tts_tupleDescriptor -> natts );
683
+
684
+ if (!tuples_equal (scanslot , searchslot , eq , NULL ))
685
+ continue ;
686
+ }
687
+
688
+ update_most_recent_deletion_info (scanslot , oldestxmin , delete_xid ,
689
+ delete_time , delete_origin );
690
+ }
691
+
692
+ index_endscan (scan );
693
+
694
+ index_close (idxrel , NoLock );
695
+
696
+ ExecDropSingleTupleTableSlot (scanslot );
697
+
698
+ return * delete_time != 0 ;
699
+ }
700
+
458
701
/*
459
702
* Find the tuple that violates the passed unique index (conflictindex).
460
703
*
0 commit comments