Skip to content

Commit fd5a1a0

Browse files
author
Amit Kapila
committed
Detect and report update_deleted conflicts.
This enhancement builds upon the infrastructure introduced in commit 228c370, which enables the preservation of deleted tuples and their origin information on the subscriber. This capability is crucial for handling concurrent transactions replicated from remote nodes. The update introduces support for detecting update_deleted conflicts during the application of update operations on the subscriber. When an update operation fails to locate the target row-typically because it has been concurrently deleted-we perform an additional table scan. This scan uses the SnapshotAny mechanism and we do this additional scan only when the retain_dead_tuples option is enabled for the relevant subscription. The goal of this scan is to locate the most recently deleted tuple-matching the old column values from the remote update-that has not yet been removed by VACUUM and is still visible according to our slot (i.e., its deletion is not older than conflict-detection-slot's xmin). If such a tuple is found, the system reports an update_deleted conflict, including the origin and transaction details responsible for the deletion. This provides a groundwork for more robust and accurate conflict resolution process, preventing unexpected behavior by correctly identifying cases where a remote update clashes with a deletion from another origin. Author: Zhijie Hou <houzj.fnst@fujitsu.com> Reviewed-by: shveta malik <shveta.malik@gmail.com> Reviewed-by: Nisha Moond <nisha.moond412@gmail.com> Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com> Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Discussion: https://postgr.es/m/OS0PR01MB5716BE80DAEB0EE2A6A5D1F5949D2@OS0PR01MB5716.jpnprd01.prod.outlook.com
1 parent 5c8eda1 commit fd5a1a0

File tree

16 files changed

+566
-46
lines changed

16 files changed

+566
-46
lines changed

doc/src/sgml/catalogs.sgml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8087,7 +8087,8 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
80878087
<structfield>subretaindeadtuples</structfield> <type>bool</type>
80888088
</para>
80898089
<para>
8090-
If true, the information (e.g., dead tuples, commit timestamps, and
8090+
If true, the detection of <xref linkend="conflict-update-deleted"/> is
8091+
enabled and the information (e.g., dead tuples, commit timestamps, and
80918092
origins) on the subscriber that is useful for conflict detection is
80928093
retained.
80938094
</para></entry>

doc/src/sgml/logical-replication.sgml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1804,6 +1804,22 @@ Publications:
18041804
</para>
18051805
</listitem>
18061806
</varlistentry>
1807+
<varlistentry id="conflict-update-deleted" xreflabel="update_deleted">
1808+
<term><literal>update_deleted</literal></term>
1809+
<listitem>
1810+
<para>
1811+
The tuple to be updated was concurrently deleted by another origin. The
1812+
update will simply be skipped in this scenario. Note that this conflict
1813+
can only be detected when
1814+
<link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
1815+
and <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>
1816+
are enabled. Note that if a tuple cannot be found due to the table being
1817+
truncated, only a <literal>update_missing</literal> conflict will
1818+
arise. Additionally, if the tuple was deleted by the same origin, an
1819+
<literal>update_missing</literal> conflict will arise.
1820+
</para>
1821+
</listitem>
1822+
</varlistentry>
18071823
<varlistentry id="conflict-update-missing" xreflabel="update_missing">
18081824
<term><literal>update_missing</literal></term>
18091825
<listitem>

doc/src/sgml/monitoring.sgml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2223,6 +2223,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage
22232223
</para></entry>
22242224
</row>
22252225

2226+
<row>
2227+
<entry role="catalog_table_entry"><para role="column_definition">
2228+
<structfield>confl_update_deleted</structfield> <type>bigint</type>
2229+
</para>
2230+
<para>
2231+
Number of times the tuple to be updated was concurrently deleted by
2232+
another source during the application of changes. See <xref linkend="conflict-update-deleted"/>
2233+
for details about this conflict.
2234+
</para></entry>
2235+
</row>
2236+
22262237
<row>
22272238
<entry role="catalog_table_entry"><para role="column_definition">
22282239
<structfield>confl_update_missing</structfield> <type>bigint</type>

doc/src/sgml/ref/create_subscription.sgml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -445,10 +445,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
445445
Specifies whether the information (e.g., dead tuples, commit
446446
timestamps, and origins) required for conflict detection on the
447447
subscriber is retained. The default is <literal>false</literal>.
448-
If set to <literal>true</literal>, a physical replication slot named
449-
<quote><literal>pg_conflict_detection</literal></quote> will be
450-
created on the subscriber to prevent the conflict information from
451-
being removed.
448+
If set to <literal>true</literal>, the detection of
449+
<xref linkend="conflict-update-deleted"/> is enabled, and a physical
450+
replication slot named <quote><literal>pg_conflict_detection</literal></quote>
451+
created on the subscriber to prevent the information for detecting
452+
conflicts from being removed.
452453
</para>
453454

454455
<para>

src/backend/catalog/system_views.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1399,6 +1399,7 @@ CREATE VIEW pg_stat_subscription_stats AS
13991399
ss.confl_insert_exists,
14001400
ss.confl_update_origin_differs,
14011401
ss.confl_update_exists,
1402+
ss.confl_update_deleted,
14021403
ss.confl_update_missing,
14031404
ss.confl_delete_origin_differs,
14041405
ss.confl_delete_missing,

src/backend/executor/execReplication.c

Lines changed: 247 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@
1414

1515
#include "postgres.h"
1616

17+
#include "access/commit_ts.h"
1718
#include "access/genam.h"
1819
#include "access/gist.h"
1920
#include "access/relscan.h"
2021
#include "access/tableam.h"
2122
#include "access/transam.h"
2223
#include "access/xact.h"
24+
#include "access/heapam.h"
2325
#include "catalog/pg_am_d.h"
2426
#include "commands/trigger.h"
2527
#include "executor/executor.h"
@@ -36,7 +38,7 @@
3638

3739

3840
static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
39-
TypeCacheEntry **eq);
41+
TypeCacheEntry **eq, Bitmapset *columns);
4042

4143
/*
4244
* Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
@@ -221,7 +223,7 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
221223
if (eq == NULL)
222224
eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
223225

224-
if (!tuples_equal(outslot, searchslot, eq))
226+
if (!tuples_equal(outslot, searchslot, eq, NULL))
225227
continue;
226228
}
227229

@@ -277,10 +279,13 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
277279

278280
/*
279281
* Compare the tuples in the slots by checking if they have equal values.
282+
*
283+
* If 'columns' is not null, only the columns specified within it will be
284+
* considered for the equality check, ignoring all other columns.
280285
*/
281286
static bool
282287
tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
283-
TypeCacheEntry **eq)
288+
TypeCacheEntry **eq, Bitmapset *columns)
284289
{
285290
int attrnum;
286291

@@ -305,6 +310,14 @@ tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
305310
if (att->attisdropped || att->attgenerated)
306311
continue;
307312

313+
/*
314+
* Ignore columns that are not listed for checking.
315+
*/
316+
if (columns &&
317+
!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
318+
columns))
319+
continue;
320+
308321
/*
309322
* If one value is NULL and other is not, then they are certainly not
310323
* equal
@@ -380,7 +393,7 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
380393
/* Try to find the tuple */
381394
while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
382395
{
383-
if (!tuples_equal(scanslot, searchslot, eq))
396+
if (!tuples_equal(scanslot, searchslot, eq, NULL))
384397
continue;
385398

386399
found = true;
@@ -455,6 +468,236 @@ BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex)
455468
}
456469
}
457470

471+
/*
472+
* If the tuple is recently dead and was deleted by a transaction with a newer
473+
* commit timestamp than previously recorded, update the associated transaction
474+
* ID, commit time, and origin. This helps ensure that conflict detection uses
475+
* the most recent and relevant deletion metadata.
476+
*/
477+
static void
478+
update_most_recent_deletion_info(TupleTableSlot *scanslot,
479+
TransactionId oldestxmin,
480+
TransactionId *delete_xid,
481+
TimestampTz *delete_time,
482+
RepOriginId *delete_origin)
483+
{
484+
BufferHeapTupleTableSlot *hslot;
485+
HeapTuple tuple;
486+
Buffer buf;
487+
bool recently_dead = false;
488+
TransactionId xmax;
489+
TimestampTz localts;
490+
RepOriginId localorigin;
491+
492+
hslot = (BufferHeapTupleTableSlot *) scanslot;
493+
494+
tuple = ExecFetchSlotHeapTuple(scanslot, false, NULL);
495+
buf = hslot->buffer;
496+
497+
LockBuffer(buf, BUFFER_LOCK_SHARE);
498+
499+
/*
500+
* We do not consider HEAPTUPLE_DEAD status because it indicates either
501+
* tuples whose inserting transaction was aborted (meaning there is no
502+
* commit timestamp or origin), or tuples deleted by a transaction older
503+
* than oldestxmin, making it safe to ignore them during conflict
504+
* detection (See comments atop worker.c for details).
505+
*/
506+
if (HeapTupleSatisfiesVacuum(tuple, oldestxmin, buf) == HEAPTUPLE_RECENTLY_DEAD)
507+
recently_dead = true;
508+
509+
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
510+
511+
if (!recently_dead)
512+
return;
513+
514+
xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
515+
if (!TransactionIdIsValid(xmax))
516+
return;
517+
518+
/* Select the dead tuple with the most recent commit timestamp */
519+
if (TransactionIdGetCommitTsData(xmax, &localts, &localorigin) &&
520+
TimestampDifferenceExceeds(*delete_time, localts, 0))
521+
{
522+
*delete_xid = xmax;
523+
*delete_time = localts;
524+
*delete_origin = localorigin;
525+
}
526+
}
527+
528+
/*
529+
* Searches the relation 'rel' for the most recently deleted tuple that matches
530+
* the values in 'searchslot' and is not yet removable by VACUUM. The function
531+
* returns the transaction ID, origin, and commit timestamp of the transaction
532+
* that deleted this tuple.
533+
*
534+
* 'oldestxmin' acts as a cutoff transaction ID. Tuples deleted by transactions
535+
* with IDs >= 'oldestxmin' are considered recently dead and are eligible for
536+
* conflict detection.
537+
*
538+
* Instead of stopping at the first match, we scan all matching dead tuples to
539+
* identify most recent deletion. This is crucial because only the latest
540+
* deletion is relevant for resolving conflicts.
541+
*
542+
* For example, consider a scenario on the subscriber where a row is deleted,
543+
* re-inserted, and then deleted again only on the subscriber:
544+
*
545+
* - (pk, 1) - deleted at 9:00,
546+
* - (pk, 1) - deleted at 9:02,
547+
*
548+
* Now, a remote update arrives: (pk, 1) -> (pk, 2), timestamped at 9:01.
549+
*
550+
* If we mistakenly return the older deletion (9:00), the system may wrongly
551+
* apply the remote update using a last-update-wins strategy. Instead, we must
552+
* recognize the more recent deletion at 9:02 and skip the update. See
553+
* comments atop worker.c for details. Note, as of now, conflict resolution
554+
* is not implemented. Consequently, the system may incorrectly report the
555+
* older tuple as the conflicted one, leading to misleading results.
556+
*
557+
* The commit timestamp of the deleting transaction is used to determine which
558+
* tuple was deleted most recently.
559+
*/
560+
bool
561+
RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot,
562+
TransactionId oldestxmin,
563+
TransactionId *delete_xid,
564+
RepOriginId *delete_origin,
565+
TimestampTz *delete_time)
566+
{
567+
TupleTableSlot *scanslot;
568+
TableScanDesc scan;
569+
TypeCacheEntry **eq;
570+
Bitmapset *indexbitmap;
571+
TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
572+
573+
Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
574+
575+
*delete_xid = InvalidTransactionId;
576+
*delete_origin = InvalidRepOriginId;
577+
*delete_time = 0;
578+
579+
/*
580+
* If the relation has a replica identity key or a primary key that is
581+
* unusable for locating deleted tuples (see
582+
* IsIndexUsableForFindingDeletedTuple), a full table scan becomes
583+
* necessary. In such cases, comparing the entire tuple is not required,
584+
* since the remote tuple might not include all column values. Instead,
585+
* the indexed columns alone are suffcient to identify the target tuple
586+
* (see logicalrep_rel_mark_updatable).
587+
*/
588+
indexbitmap = RelationGetIndexAttrBitmap(rel,
589+
INDEX_ATTR_BITMAP_IDENTITY_KEY);
590+
591+
/* fallback to PK if no replica identity */
592+
if (!indexbitmap)
593+
indexbitmap = RelationGetIndexAttrBitmap(rel,
594+
INDEX_ATTR_BITMAP_PRIMARY_KEY);
595+
596+
eq = palloc0(sizeof(*eq) * searchslot->tts_tupleDescriptor->natts);
597+
598+
/*
599+
* Start a heap scan using SnapshotAny to identify dead tuples that are
600+
* not visible under a standard MVCC snapshot. Tuples from transactions
601+
* not yet committed or those just committed prior to the scan are
602+
* excluded in update_most_recent_deletion_info().
603+
*/
604+
scan = table_beginscan(rel, SnapshotAny, 0, NULL);
605+
scanslot = table_slot_create(rel, NULL);
606+
607+
table_rescan(scan, NULL);
608+
609+
/* Try to find the tuple */
610+
while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
611+
{
612+
if (!tuples_equal(scanslot, searchslot, eq, indexbitmap))
613+
continue;
614+
615+
update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
616+
delete_time, delete_origin);
617+
}
618+
619+
table_endscan(scan);
620+
ExecDropSingleTupleTableSlot(scanslot);
621+
622+
return *delete_time != 0;
623+
}
624+
625+
/*
626+
* Similar to RelationFindDeletedTupleInfoSeq() but using index scan to locate
627+
* the deleted tuple.
628+
*/
629+
bool
630+
RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid,
631+
TupleTableSlot *searchslot,
632+
TransactionId oldestxmin,
633+
TransactionId *delete_xid,
634+
RepOriginId *delete_origin,
635+
TimestampTz *delete_time)
636+
{
637+
Relation idxrel;
638+
ScanKeyData skey[INDEX_MAX_KEYS];
639+
int skey_attoff;
640+
IndexScanDesc scan;
641+
TupleTableSlot *scanslot;
642+
TypeCacheEntry **eq = NULL;
643+
bool isIdxSafeToSkipDuplicates;
644+
TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
645+
646+
Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
647+
Assert(OidIsValid(idxoid));
648+
649+
*delete_xid = InvalidTransactionId;
650+
*delete_time = 0;
651+
*delete_origin = InvalidRepOriginId;
652+
653+
isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
654+
655+
scanslot = table_slot_create(rel, NULL);
656+
657+
idxrel = index_open(idxoid, RowExclusiveLock);
658+
659+
/* Build scan key. */
660+
skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
661+
662+
/*
663+
* Start an index scan using SnapshotAny to identify dead tuples that are
664+
* not visible under a standard MVCC snapshot. Tuples from transactions
665+
* not yet committed or those just committed prior to the scan are
666+
* excluded in update_most_recent_deletion_info().
667+
*/
668+
scan = index_beginscan(rel, idxrel, SnapshotAny, NULL, skey_attoff, 0);
669+
670+
index_rescan(scan, skey, skey_attoff, NULL, 0);
671+
672+
/* Try to find the tuple */
673+
while (index_getnext_slot(scan, ForwardScanDirection, scanslot))
674+
{
675+
/*
676+
* Avoid expensive equality check if the index is primary key or
677+
* replica identity index.
678+
*/
679+
if (!isIdxSafeToSkipDuplicates)
680+
{
681+
if (eq == NULL)
682+
eq = palloc0(sizeof(*eq) * scanslot->tts_tupleDescriptor->natts);
683+
684+
if (!tuples_equal(scanslot, searchslot, eq, NULL))
685+
continue;
686+
}
687+
688+
update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
689+
delete_time, delete_origin);
690+
}
691+
692+
index_endscan(scan);
693+
694+
index_close(idxrel, NoLock);
695+
696+
ExecDropSingleTupleTableSlot(scanslot);
697+
698+
return *delete_time != 0;
699+
}
700+
458701
/*
459702
* Find the tuple that violates the passed unique index (conflictindex).
460703
*

0 commit comments

Comments
 (0)