From 37d9bbac21fdc185cbbb689536204845a295e07b Mon Sep 17 00:00:00 2001 From: Daniel Shelepanov Date: Sun, 9 Apr 2023 21:07:02 +0300 Subject: [PATCH 1/5] max lsn --- engine.c | 79 ++++++++++++++++++++++++++++++++++++++++++++++++-------- engine.h | 2 ++ ptrack.c | 13 ++++++++++ 3 files changed, 83 insertions(+), 11 deletions(-) diff --git a/engine.c b/engine.c index afaae1f..5117aee 100644 --- a/engine.c +++ b/engine.c @@ -675,6 +675,34 @@ ptrack_walkdir(const char *path, Oid tablespaceOid, Oid dbOid) /* * Mark modified block in ptrack_map. */ +static void swap_slots(size_t *slot1, size_t *slot2) { + *slot1 ^= *slot2; + *slot2 = *slot1 ^ *slot2; + *slot1 = *slot1 ^ *slot2; +} + +static void +ptrack_mark_map_pair(size_t slot1, size_t slot2, XLogRecPtr new_lsn) +{ + /* + * We use pg_atomic_uint64 here only for alignment purposes, because + * pg_atomic_uint64 is forcedly aligned on 8 bytes during the MSVC build. + */ + pg_atomic_uint64 old_lsn; + + /* Atomically assign new LSN value to the first slot */ + old_lsn.value = pg_atomic_read_u64(&ptrack_map->entries[slot1]); + elog(DEBUG3, "ptrack_mark_block: map[%zu]=" UINT64_FORMAT " <- " UINT64_FORMAT, slot1, old_lsn.value, new_lsn); + while (old_lsn.value < new_lsn && + !pg_atomic_compare_exchange_u64(&ptrack_map->entries[slot1], (uint64 *) &old_lsn.value, new_lsn)); + + /* And to the second */ + old_lsn.value = pg_atomic_read_u64(&ptrack_map->entries[slot2]); + elog(DEBUG3, "ptrack_mark_block: map[%zu]=" UINT64_FORMAT " <- " UINT64_FORMAT, slot2, old_lsn.value, new_lsn); + while (old_lsn.value < new_lsn && + !pg_atomic_compare_exchange_u64(&ptrack_map->entries[slot2], (uint64 *) &old_lsn.value, new_lsn)); +} + void ptrack_mark_block(RelFileNodeBackend smgr_rnode, ForkNumber forknum, BlockNumber blocknum) @@ -683,12 +711,13 @@ ptrack_mark_block(RelFileNodeBackend smgr_rnode, uint64 hash; size_t slot1; size_t slot2; + size_t max_lsn_slot1; + size_t max_lsn_slot2; XLogRecPtr new_lsn; /* * We use pg_atomic_uint64 here only for alignment purposes, because * pg_atomic_uint64 is forcedly aligned on 8 bytes during the MSVC build. */ - pg_atomic_uint64 old_lsn; pg_atomic_uint64 old_init_lsn; if (ptrack_map_size == 0 @@ -705,6 +734,14 @@ ptrack_mark_block(RelFileNodeBackend smgr_rnode, slot1 = (size_t)(hash % PtrackContentNblocks); slot2 = (size_t)(((hash << 32) | (hash >> 32)) % PtrackContentNblocks); + bid.blocknum = InvalidBlockNumber; + hash = BID_HASH_FUNC(bid); + max_lsn_slot1 = (size_t)(hash % PtrackContentNblocks); + max_lsn_slot2 = max_lsn_slot1 + 1; + + if (max_lsn_slot2 < max_lsn_slot1) + swap_slots(&max_lsn_slot1, &max_lsn_slot2); + if (RecoveryInProgress()) new_lsn = GetXLogReplayRecPtr(NULL); else @@ -720,15 +757,35 @@ ptrack_mark_block(RelFileNodeBackend smgr_rnode, !pg_atomic_compare_exchange_u64(&ptrack_map->init_lsn, (uint64 *) &old_init_lsn.value, new_lsn)); } - /* Atomically assign new LSN value to the first slot */ - old_lsn.value = pg_atomic_read_u64(&ptrack_map->entries[slot1]); - elog(DEBUG3, "ptrack_mark_block: map[%zu]=" UINT64_FORMAT " <- " UINT64_FORMAT, slot1, old_lsn.value, new_lsn); - while (old_lsn.value < new_lsn && - !pg_atomic_compare_exchange_u64(&ptrack_map->entries[slot1], (uint64 *) &old_lsn.value, new_lsn)); + // mark the page + ptrack_mark_map_pair(slot1, slot2, new_lsn); + // mark the file (new LSN is always valid maximum LSN) + ptrack_mark_map_pair(max_lsn_slot1, max_lsn_slot2, new_lsn); +} - /* And to the second */ - old_lsn.value = pg_atomic_read_u64(&ptrack_map->entries[slot2]); - elog(DEBUG3, "ptrack_mark_block: map[%zu]=" UINT64_FORMAT " <- " UINT64_FORMAT, slot2, old_lsn.value, new_lsn); - while (old_lsn.value < new_lsn && - !pg_atomic_compare_exchange_u64(&ptrack_map->entries[slot2], (uint64 *) &old_lsn.value, new_lsn)); +XLogRecPtr ptrack_read_file_maxlsn(RelFileNode rnode, ForkNumber forknum) +{ + PtBlockId bid; + uint64 hash; + size_t slot1; + size_t slot2; + XLogRecPtr update_lsn1; + XLogRecPtr update_lsn2; + + bid.relnode = rnode; + bid.forknum = forknum; + bid.blocknum = InvalidBlockNumber; + + hash = BID_HASH_FUNC(bid); + + slot1 = (size_t)(hash % PtrackContentNblocks); + slot2 = slot1 + 1; + + if (slot2 < slot1) + swap_slots(&slot1, &slot2); + + update_lsn1 = pg_atomic_read_u64(&ptrack_map->entries[slot1]); + update_lsn2 = pg_atomic_read_u64(&ptrack_map->entries[slot2]); + + return update_lsn1 == update_lsn2 ? update_lsn1 : InvalidXLogRecPtr; } diff --git a/engine.h b/engine.h index 56777fc..d6f8c79 100644 --- a/engine.h +++ b/engine.h @@ -115,6 +115,8 @@ extern void assign_ptrack_map_size(int newval, void *extra); extern void ptrack_walkdir(const char *path, Oid tablespaceOid, Oid dbOid); extern void ptrack_mark_block(RelFileNodeBackend smgr_rnode, ForkNumber forkno, BlockNumber blkno); +extern XLogRecPtr ptrack_read_file_maxlsn(RelFileNode smgr_rnode, + ForkNumber forknum); extern bool is_cfm_file_path(const char *path); #ifdef PGPRO_EE diff --git a/ptrack.c b/ptrack.c index 22a2acf..abce256 100644 --- a/ptrack.c +++ b/ptrack.c @@ -410,6 +410,7 @@ ptrack_filelist_getnext(PtScanCtx * ctx) char *fullpath; struct stat fst; off_t rel_st_size = 0; + XLogRecPtr maxlsn; #if CFS_SUPPORT RelFileNodeBackend rnodebackend; #endif @@ -461,6 +462,18 @@ ptrack_filelist_getnext(PtScanCtx * ctx) goto get_next; } + maxlsn = ptrack_read_file_maxlsn(pfl->relnode, pfl->forknum); + + if (maxlsn < ctx->lsn) + { + elog(DEBUG3, "ptrack: skip file %s: maxlsn is %X/%X, expected %X/%X", + fullpath, (uint32) (maxlsn >> 32), (uint32) maxlsn, + (uint32) (ctx->lsn >> 32), (uint32) ctx->lsn); + + /* Try the next one */ + goto get_next; + } + #if CFS_SUPPORT nodeOf(rnodebackend) = ctx->bid.relnode; rnodebackend.backend = InvalidBackendId; From 6a8d7485fcc186356c8b712f35da5fbc70998728 Mon Sep 17 00:00:00 2001 From: Daniel Shelepanov Date: Mon, 17 Apr 2023 23:19:42 +0300 Subject: [PATCH 2/5] 32-bit lsn with wrap-around --- engine.c | 87 +++++++++++++++++++++++++++++++++++--------------------- engine.h | 11 ++++--- ptrack.c | 30 ++++++++++++------- ptrack.h | 4 ++- 4 files changed, 85 insertions(+), 47 deletions(-) diff --git a/engine.c b/engine.c index 5117aee..e07c26e 100644 --- a/engine.c +++ b/engine.c @@ -258,7 +258,7 @@ ptrackMapReadFromFile(const char *ptrack_path) * postmaster is the only user right now. */ elog(DEBUG1, "ptrack read map: crc %u, file_crc %u, init_lsn %X/%X", - crc, *file_crc, (uint32) (ptrack_map->init_lsn.value >> 32), (uint32) ptrack_map->init_lsn.value); + crc, *file_crc, (uint16) (ptrack_map->init_lsn.value >> 16), (uint16) ptrack_map->init_lsn.value); if (!EQ_CRC32C(*file_crc, crc)) { @@ -330,7 +330,7 @@ ptrackMapInit(void) * Fill entries with InvalidXLogRecPtr * (InvalidXLogRecPtr is actually 0) */ - memset(ptrack_map->entries, 0, PtrackContentNblocks * sizeof(pg_atomic_uint64)); + memset(ptrack_map->entries, 0, PtrackContentNblocks * sizeof(pg_atomic_uint32)); /* * Last part of memory representation of ptrack_map (crc) is actually unused * so leave it as it is @@ -348,11 +348,15 @@ ptrackCheckpoint(void) pg_crc32c crc; char ptrack_path[MAXPGPATH]; char ptrack_path_tmp[MAXPGPATH]; - XLogRecPtr init_lsn; - pg_atomic_uint64 buf[PTRACK_BUF_SIZE]; + uint32 init_lsn; + pg_atomic_uint32 buf[PTRACK_BUF_SIZE]; struct stat stat_buf; uint64 i = 0; uint64 j = 0; + XLogRecPtr new_init_lsn; + uint32 new_init_lsn32; + uint32 latest_lsn; + bool lsn_was_advanced = false; elog(DEBUG1, "ptrack checkpoint"); @@ -408,20 +412,27 @@ ptrackCheckpoint(void) ptrack_write_chunk(ptrack_tmp_fd, &crc, (char *) ptrack_map, offsetof(PtrackMapHdr, init_lsn)); - init_lsn = pg_atomic_read_u64(&ptrack_map->init_lsn); + latest_lsn = pg_atomic_read_u32(&ptrack_map->latest_lsn); + init_lsn = pg_atomic_read_u32(&ptrack_map->init_lsn); /* Set init_lsn during checkpoint if it is not set yet */ if (init_lsn == InvalidXLogRecPtr) { - XLogRecPtr new_init_lsn; - if (RecoveryInProgress()) new_init_lsn = GetXLogReplayRecPtr(NULL); else new_init_lsn = GetXLogInsertRecPtr(); - pg_atomic_write_u64(&ptrack_map->init_lsn, new_init_lsn); - init_lsn = new_init_lsn; + new_init_lsn32 = (uint32)(new_init_lsn >> 16); + pg_atomic_write_u32(&ptrack_map->init_lsn, new_init_lsn32); + init_lsn = new_init_lsn32; + } + else if (lsn_diff(lsn_advance(init_lsn, PtrackLSNGap), latest_lsn) < 0) + { + new_init_lsn32 = lsn_advance(init_lsn, PtrackLSNGap); + lsn_was_advanced = true; + pg_atomic_write_u32(&ptrack_map->init_lsn, new_init_lsn32); + init_lsn = new_init_lsn32; } /* Put init_lsn in the same buffer */ @@ -435,7 +446,7 @@ ptrackCheckpoint(void) */ while (i < PtrackContentNblocks) { - XLogRecPtr lsn; + uint32 lsn; /* * We store LSN values as pg_atomic_uint64 in the ptrack map, but @@ -445,8 +456,12 @@ ptrackCheckpoint(void) * * TODO: is it safe and can we do any better? */ - lsn = pg_atomic_read_u64(&ptrack_map->entries[i]); - buf[j].value = lsn; + lsn = pg_atomic_read_u32(&ptrack_map->entries[i]); + + if (lsn_was_advanced && lsn_diff(lsn, init_lsn) < 0) + buf[j].value = InvalidXLogRecPtr; + else + buf[j].value = lsn; i++; j++; @@ -464,7 +479,6 @@ ptrackCheckpoint(void) ptrack_write_chunk(ptrack_tmp_fd, &crc, (char *) buf, writesz); elog(DEBUG5, "ptrack checkpoint: i " UINT64_FORMAT ", j " UINT64_FORMAT ", writesz %zu PtrackContentNblocks " UINT64_FORMAT, i, j, writesz, (uint64) PtrackContentNblocks); - j = 0; } } @@ -472,7 +486,7 @@ ptrackCheckpoint(void) /* Write if anything left */ if ((i + 1) % PTRACK_BUF_SIZE != 0) { - size_t writesz = sizeof(pg_atomic_uint64) * j; + size_t writesz = sizeof(pg_atomic_uint32) * j; ptrack_write_chunk(ptrack_tmp_fd, &crc, (char *) buf, writesz); elog(DEBUG5, "ptrack checkpoint: final i " UINT64_FORMAT ", j " UINT64_FORMAT ", writesz %zu PtrackContentNblocks " UINT64_FORMAT, @@ -682,25 +696,30 @@ static void swap_slots(size_t *slot1, size_t *slot2) { } static void -ptrack_mark_map_pair(size_t slot1, size_t slot2, XLogRecPtr new_lsn) +ptrack_mark_map_pair(size_t slot1, size_t slot2, uint32 new_lsn32) { /* * We use pg_atomic_uint64 here only for alignment purposes, because * pg_atomic_uint64 is forcedly aligned on 8 bytes during the MSVC build. */ - pg_atomic_uint64 old_lsn; + pg_atomic_uint32 old_lsn; - /* Atomically assign new LSN value to the first slot */ - old_lsn.value = pg_atomic_read_u64(&ptrack_map->entries[slot1]); - elog(DEBUG3, "ptrack_mark_block: map[%zu]=" UINT64_FORMAT " <- " UINT64_FORMAT, slot1, old_lsn.value, new_lsn); - while (old_lsn.value < new_lsn && - !pg_atomic_compare_exchange_u64(&ptrack_map->entries[slot1], (uint64 *) &old_lsn.value, new_lsn)); + /* Assign latest_lsn first */ + old_lsn.value = pg_atomic_read_u32(&ptrack_map->latest_lsn); + while (old_lsn.value < new_lsn32 && + !pg_atomic_compare_exchange_u32(&ptrack_map->latest_lsn, (uint32 *) &old_lsn.value, new_lsn32)); + + /* Then, atomically assign new LSN value to the first slot */ + old_lsn.value = pg_atomic_read_u32(&ptrack_map->entries[slot1]); + elog(DEBUG3, "ptrack_mark_block: map[%zu]=%u <- %u", slot1, old_lsn.value, new_lsn32); + while (old_lsn.value < new_lsn32 && + !pg_atomic_compare_exchange_u32(&ptrack_map->entries[slot1], (uint32 *) &old_lsn.value, new_lsn32)); /* And to the second */ - old_lsn.value = pg_atomic_read_u64(&ptrack_map->entries[slot2]); - elog(DEBUG3, "ptrack_mark_block: map[%zu]=" UINT64_FORMAT " <- " UINT64_FORMAT, slot2, old_lsn.value, new_lsn); - while (old_lsn.value < new_lsn && - !pg_atomic_compare_exchange_u64(&ptrack_map->entries[slot2], (uint64 *) &old_lsn.value, new_lsn)); + old_lsn.value = pg_atomic_read_u32(&ptrack_map->entries[slot2]); + elog(DEBUG3, "ptrack_mark_block: map[%zu]=%u <- %u", slot2, old_lsn.value, new_lsn32); + while (old_lsn.value < new_lsn32 && + !pg_atomic_compare_exchange_u32(&ptrack_map->entries[slot2], (uint32 *) &old_lsn.value, new_lsn32)); } void @@ -714,11 +733,13 @@ ptrack_mark_block(RelFileNodeBackend smgr_rnode, size_t max_lsn_slot1; size_t max_lsn_slot2; XLogRecPtr new_lsn; + uint32 new_lsn32; /* * We use pg_atomic_uint64 here only for alignment purposes, because * pg_atomic_uint64 is forcedly aligned on 8 bytes during the MSVC build. */ - pg_atomic_uint64 old_init_lsn; + pg_atomic_uint32 old_lsn; + pg_atomic_uint32 old_init_lsn; if (ptrack_map_size == 0 || ptrack_map == NULL @@ -747,20 +768,22 @@ ptrack_mark_block(RelFileNodeBackend smgr_rnode, else new_lsn = GetXLogInsertRecPtr(); + new_lsn32 = (uint32)(new_lsn >> 16); + /* Atomically assign new init LSN value */ - old_init_lsn.value = pg_atomic_read_u64(&ptrack_map->init_lsn); + old_init_lsn.value = pg_atomic_read_u32(&ptrack_map->init_lsn); if (old_init_lsn.value == InvalidXLogRecPtr) { - elog(DEBUG1, "ptrack_mark_block: init_lsn " UINT64_FORMAT " <- " UINT64_FORMAT, old_init_lsn.value, new_lsn); + elog(DEBUG1, "ptrack_mark_block: init_lsn %u <- %u", old_init_lsn.value, new_lsn32); - while (old_init_lsn.value < new_lsn && - !pg_atomic_compare_exchange_u64(&ptrack_map->init_lsn, (uint64 *) &old_init_lsn.value, new_lsn)); + while (old_init_lsn.value < new_lsn32 && + !pg_atomic_compare_exchange_u32(&ptrack_map->init_lsn, (uint32 *) &old_init_lsn.value, new_lsn32)); } // mark the page - ptrack_mark_map_pair(slot1, slot2, new_lsn); + ptrack_mark_map_pair(slot1, slot2, new_lsn32); // mark the file (new LSN is always valid maximum LSN) - ptrack_mark_map_pair(max_lsn_slot1, max_lsn_slot2, new_lsn); + ptrack_mark_map_pair(max_lsn_slot1, max_lsn_slot2, new_lsn32); } XLogRecPtr ptrack_read_file_maxlsn(RelFileNode rnode, ForkNumber forknum) diff --git a/engine.h b/engine.h index d6f8c79..212d345 100644 --- a/engine.h +++ b/engine.h @@ -65,11 +65,13 @@ typedef struct PtrackMapHdr */ uint32 version_num; + /* LSN of current writing position */ + pg_atomic_uint32 latest_lsn; /* LSN of the moment, when map was last enabled. */ - pg_atomic_uint64 init_lsn; + pg_atomic_uint32 init_lsn; /* Followed by the actual map of LSNs */ - pg_atomic_uint64 entries[FLEXIBLE_ARRAY_MEMBER]; + pg_atomic_uint32 entries[FLEXIBLE_ARRAY_MEMBER]; /* * At the end of the map CRC of type pg_crc32c is stored. @@ -80,11 +82,11 @@ typedef PtrackMapHdr * PtrackMap; /* Number of elements in ptrack map (LSN array) */ #define PtrackContentNblocks \ - ((ptrack_map_size - offsetof(PtrackMapHdr, entries) - sizeof(pg_crc32c)) / sizeof(pg_atomic_uint64)) + ((ptrack_map_size - offsetof(PtrackMapHdr, entries) - sizeof(pg_crc32c)) / sizeof(pg_atomic_uint32)) /* Actual size of the ptrack map, that we are able to fit into ptrack_map_size */ #define PtrackActualSize \ - (offsetof(PtrackMapHdr, entries) + PtrackContentNblocks * sizeof(pg_atomic_uint64) + sizeof(pg_crc32c)) + (offsetof(PtrackMapHdr, entries) + PtrackContentNblocks * sizeof(pg_atomic_uint32) + sizeof(pg_crc32c)) /* CRC32 value offset in order to directly access it in the shared memory chunk */ #define PtrackCrcOffset (PtrackActualSize - sizeof(pg_crc32c)) @@ -94,6 +96,7 @@ typedef PtrackMapHdr * PtrackMap; #define BID_HASH_FUNC(bid) \ (DatumGetUInt64(hash_any_extended((unsigned char *)&bid, sizeof(bid), 0))) +#define PtrackLSNGap 10e8 /* * Per process pointer to shared ptrack_map */ diff --git a/ptrack.c b/ptrack.c index abce256..830d47f 100644 --- a/ptrack.c +++ b/ptrack.c @@ -521,7 +521,7 @@ ptrack_init_lsn(PG_FUNCTION_ARGS) { if (ptrack_map != NULL) { - XLogRecPtr init_lsn = pg_atomic_read_u64(&ptrack_map->init_lsn); + XLogRecPtr init_lsn = (XLogRecPtr) (pg_atomic_read_u32(&ptrack_map->init_lsn) << 16); PG_RETURN_LSN(init_lsn); } @@ -546,6 +546,8 @@ ptrack_get_pagemapset(PG_FUNCTION_ARGS) datapagemap_t pagemap; int64 pagecount = 0; char gather_path[MAXPGPATH]; + uint32 init_lsn = InvalidXLogRecPtr; + bool within_ptrack_map = true; /* Exit immediately if there is no map */ if (ptrack_map == NULL) @@ -554,13 +556,14 @@ ptrack_get_pagemapset(PG_FUNCTION_ARGS) if (SRF_IS_FIRSTCALL()) { TupleDesc tupdesc; + XLogRecPtr lsn = PG_GETARG_LSN(0); funcctx = SRF_FIRSTCALL_INIT(); oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); ctx = (PtScanCtx *) palloc0(sizeof(PtScanCtx)); - ctx->lsn = PG_GETARG_LSN(0); + ctx->lsn = (uint32)(lsn >> 16); ctx->filelist = NIL; /* Make tuple descriptor */ @@ -610,8 +613,8 @@ ptrack_get_pagemapset(PG_FUNCTION_ARGS) uint64 hash; size_t slot1; size_t slot2; - XLogRecPtr update_lsn1; - XLogRecPtr update_lsn2; + uint32 update_lsn1; + uint32 update_lsn2; /* Stop traversal if there are no more segments */ if (ctx->bid.blocknum >= ctx->relsize) @@ -651,29 +654,36 @@ ptrack_get_pagemapset(PG_FUNCTION_ARGS) SRF_RETURN_DONE(funcctx); } + init_lsn = pg_atomic_read_u32(&ptrack_map->init_lsn); hash = BID_HASH_FUNC(ctx->bid); slot1 = (size_t)(hash % PtrackContentNblocks); - update_lsn1 = pg_atomic_read_u64(&ptrack_map->entries[slot1]); + update_lsn1 = pg_atomic_read_u32(&ptrack_map->entries[slot1]); if (update_lsn1 != InvalidXLogRecPtr) elog(DEBUG3, "ptrack: update_lsn1 %X/%X of blckno %u of file %s", - (uint32) (update_lsn1 >> 32), (uint32) update_lsn1, + (uint16) (update_lsn1 >> 16), (uint16) update_lsn1, ctx->bid.blocknum, ctx->relpath); + if (init_lsn != InvalidXLogRecPtr) + within_ptrack_map = lsn_diff(init_lsn, update_lsn1) <= 0; + /* Only probe the second slot if the first one is marked */ - if (update_lsn1 >= ctx->lsn) + if (within_ptrack_map && lsn_diff(ctx->lsn, update_lsn1) <= 0) { slot2 = (size_t)(((hash << 32) | (hash >> 32)) % PtrackContentNblocks); - update_lsn2 = pg_atomic_read_u64(&ptrack_map->entries[slot2]); + update_lsn2 = pg_atomic_read_u32(&ptrack_map->entries[slot2]); if (update_lsn2 != InvalidXLogRecPtr) elog(DEBUG3, "ptrack: update_lsn2 %X/%X of blckno %u of file %s", - (uint32) (update_lsn1 >> 32), (uint32) update_lsn2, + (uint16) (update_lsn1 >> 16), (uint16) update_lsn2, ctx->bid.blocknum, ctx->relpath); + if (init_lsn != InvalidXLogRecPtr) + within_ptrack_map = lsn_diff(init_lsn, update_lsn2) <= 0; + /* Block has been changed since specified LSN. Mark it in the bitmap */ - if (update_lsn2 >= ctx->lsn) + if (within_ptrack_map && lsn_diff(ctx->lsn, update_lsn2) <= 0) { pagecount += 1; datapagemap_add(&pagemap, ctx->bid.blocknum % ((BlockNumber) RELSEG_SIZE)); diff --git a/ptrack.h b/ptrack.h index e56f60b..45a8c39 100644 --- a/ptrack.h +++ b/ptrack.h @@ -47,6 +47,8 @@ #define nodeOf(ndbck) (ndbck).node #endif +#define lsn_diff(lsn1, lsn2) ((int32)(lsn1-lsn2)) +#define lsn_advance(lsn, incr) ((uint32)(lsn+incr)) /* * Structure identifying block on the disk. */ @@ -62,7 +64,7 @@ typedef struct PtBlockId */ typedef struct PtScanCtx { - XLogRecPtr lsn; + uint32 lsn; PtBlockId bid; uint32 relsize; char *relpath; From df955c6c37c2ea5e3b85b7bc5de7224bf88174d2 Mon Sep 17 00:00:00 2001 From: Daniel Shelepanov Date: Wed, 7 Jun 2023 11:30:31 +0300 Subject: [PATCH 3/5] memory page locality --- engine.c | 36 ++++++++++++++++++++++++++++++++---- engine.h | 9 +++++++++ ptrack.c | 6 +++--- 3 files changed, 44 insertions(+), 7 deletions(-) diff --git a/engine.c b/engine.c index e07c26e..f33103c 100644 --- a/engine.c +++ b/engine.c @@ -686,6 +686,35 @@ ptrack_walkdir(const char *path, Oid tablespaceOid, Oid dbOid) FreeDir(dir); /* we ignore any error here */ } +/* + * Get a second position within ptrack map so that it fits + * within the same cache line. + */ +size_t +get_slot2(size_t slot1, uint64 hash) { + size_t cache_line_ep; // ending point of a cache line + size_t cache_line_sp; // starting point of a cache line + size_t cache_line_interval; + size_t slot2; + + /* Get the ending point of a cache line within entries[]. */ + cache_line_ep = (CACHE_LINE_ALIGN(offsetof(PtrackMapHdr, entries) + slot1*sizeof(XLogRecPtr)) + - offsetof(PtrackMapHdr, entries)) / sizeof(XLogRecPtr); + /* handling an overflow beyond the entries boundary */ + cache_line_ep = cache_line_ep > PtrackContentNblocks ? PtrackContentNblocks : cache_line_ep; + + /* Get the starting point of a cache line within entries[]. */ + cache_line_sp = cache_line_ep - ENTRIES_PER_LINE; + + /* Handling overflow below zero (sp then must be larger than ep) */ + cache_line_sp = cache_line_sp > cache_line_ep ? 0 : cache_line_sp; + + cache_line_interval = cache_line_ep - cache_line_sp; + slot2 = (size_t)(cache_line_sp + (((hash << 32) | (hash >> 32)) % cache_line_interval)); + slot2 = (slot1 == slot2) ? ((slot1+1) % cache_line_interval) : slot2; + return slot2; +} + /* * Mark modified block in ptrack_map. */ @@ -738,7 +767,6 @@ ptrack_mark_block(RelFileNodeBackend smgr_rnode, * We use pg_atomic_uint64 here only for alignment purposes, because * pg_atomic_uint64 is forcedly aligned on 8 bytes during the MSVC build. */ - pg_atomic_uint32 old_lsn; pg_atomic_uint32 old_init_lsn; if (ptrack_map_size == 0 @@ -753,7 +781,7 @@ ptrack_mark_block(RelFileNodeBackend smgr_rnode, hash = BID_HASH_FUNC(bid); slot1 = (size_t)(hash % PtrackContentNblocks); - slot2 = (size_t)(((hash << 32) | (hash >> 32)) % PtrackContentNblocks); + slot2 = get_slot2(slot1, hash); bid.blocknum = InvalidBlockNumber; hash = BID_HASH_FUNC(bid); @@ -807,8 +835,8 @@ XLogRecPtr ptrack_read_file_maxlsn(RelFileNode rnode, ForkNumber forknum) if (slot2 < slot1) swap_slots(&slot1, &slot2); - update_lsn1 = pg_atomic_read_u64(&ptrack_map->entries[slot1]); - update_lsn2 = pg_atomic_read_u64(&ptrack_map->entries[slot2]); + update_lsn1 = pg_atomic_read_u32(&ptrack_map->entries[slot1]); + update_lsn2 = pg_atomic_read_u32(&ptrack_map->entries[slot2]); return update_lsn1 == update_lsn2 ? update_lsn1 : InvalidXLogRecPtr; } diff --git a/engine.h b/engine.h index 212d345..99bd4d8 100644 --- a/engine.h +++ b/engine.h @@ -40,6 +40,14 @@ */ #define PTRACK_BUF_SIZE ((uint64) 8000) +/* + * A reasonable assumption for most systems. Postgres core + * leverages the same value for this purpose. + */ +#define CACHE_LINE_SIZE 64 +#define CACHE_LINE_ALIGN(LEN) TYPEALIGN(CACHE_LINE_SIZE, (LEN)) +#define ENTRIES_PER_LINE (CACHE_LINE_SIZE/sizeof(XLogRecPtr)) + /* Ptrack magic bytes */ #define PTRACK_MAGIC "ptk" #define PTRACK_MAGIC_SIZE 4 @@ -122,6 +130,7 @@ extern XLogRecPtr ptrack_read_file_maxlsn(RelFileNode smgr_rnode, ForkNumber forknum); extern bool is_cfm_file_path(const char *path); +extern size_t get_slot2(size_t slot1, uint64 hash); #ifdef PGPRO_EE extern off_t get_cfs_relation_file_decompressed_size(RelFileNodeBackend rnode, const char *fullpath, ForkNumber forknum); diff --git a/ptrack.c b/ptrack.c index 830d47f..d4b38e3 100644 --- a/ptrack.c +++ b/ptrack.c @@ -467,8 +467,8 @@ ptrack_filelist_getnext(PtScanCtx * ctx) if (maxlsn < ctx->lsn) { elog(DEBUG3, "ptrack: skip file %s: maxlsn is %X/%X, expected %X/%X", - fullpath, (uint32) (maxlsn >> 32), (uint32) maxlsn, - (uint32) (ctx->lsn >> 32), (uint32) ctx->lsn); + fullpath, (uint16) (maxlsn >> 16), (uint16) maxlsn, + (uint16) (ctx->lsn >> 16), (uint16) ctx->lsn); /* Try the next one */ goto get_next; @@ -671,7 +671,7 @@ ptrack_get_pagemapset(PG_FUNCTION_ARGS) /* Only probe the second slot if the first one is marked */ if (within_ptrack_map && lsn_diff(ctx->lsn, update_lsn1) <= 0) { - slot2 = (size_t)(((hash << 32) | (hash >> 32)) % PtrackContentNblocks); + slot2 = get_slot2(slot1, hash); update_lsn2 = pg_atomic_read_u32(&ptrack_map->entries[slot2]); if (update_lsn2 != InvalidXLogRecPtr) From 33a3f33816ab510d3227e4e6b217db18d02c9481 Mon Sep 17 00:00:00 2001 From: Daniel Shelepanov Date: Wed, 7 Jun 2023 12:02:26 +0300 Subject: [PATCH 4/5] some refactoring --- engine.c | 43 ++++++++++++++----------------------------- engine.h | 6 +++--- 2 files changed, 17 insertions(+), 32 deletions(-) diff --git a/engine.c b/engine.c index f33103c..f7baa02 100644 --- a/engine.c +++ b/engine.c @@ -692,38 +692,29 @@ ptrack_walkdir(const char *path, Oid tablespaceOid, Oid dbOid) */ size_t get_slot2(size_t slot1, uint64 hash) { - size_t cache_line_ep; // ending point of a cache line - size_t cache_line_sp; // starting point of a cache line - size_t cache_line_interval; + size_t memory_page_ep; // ending point of a cache line + size_t memory_page_sp; // starting point of a cache line + size_t memory_page_interval; size_t slot2; - /* Get the ending point of a cache line within entries[]. */ - cache_line_ep = (CACHE_LINE_ALIGN(offsetof(PtrackMapHdr, entries) + slot1*sizeof(XLogRecPtr)) - - offsetof(PtrackMapHdr, entries)) / sizeof(XLogRecPtr); + /* Get the ending point of a memory page within entries[]. */ + memory_page_ep = (MEMORY_PAGE_ALIGN(offsetof(PtrackMapHdr, entries) + slot1*sizeof(uint32)) + - offsetof(PtrackMapHdr, entries)) / sizeof(uint32); /* handling an overflow beyond the entries boundary */ - cache_line_ep = cache_line_ep > PtrackContentNblocks ? PtrackContentNblocks : cache_line_ep; + memory_page_ep = memory_page_ep > PtrackContentNblocks ? PtrackContentNblocks : memory_page_ep; /* Get the starting point of a cache line within entries[]. */ - cache_line_sp = cache_line_ep - ENTRIES_PER_LINE; + memory_page_sp = memory_page_ep - ENTRIES_PER_PAGE; /* Handling overflow below zero (sp then must be larger than ep) */ - cache_line_sp = cache_line_sp > cache_line_ep ? 0 : cache_line_sp; + memory_page_sp = memory_page_sp > memory_page_ep ? 0 : memory_page_sp; - cache_line_interval = cache_line_ep - cache_line_sp; - slot2 = (size_t)(cache_line_sp + (((hash << 32) | (hash >> 32)) % cache_line_interval)); - slot2 = (slot1 == slot2) ? ((slot1+1) % cache_line_interval) : slot2; + memory_page_interval = memory_page_ep - memory_page_sp; + slot2 = (size_t)(memory_page_sp + (((hash << 32) | (hash >> 32)) % memory_page_interval)); + slot2 = (slot1 == slot2) ? ((slot1+1) % memory_page_interval) : slot2; return slot2; } -/* - * Mark modified block in ptrack_map. - */ -static void swap_slots(size_t *slot1, size_t *slot2) { - *slot1 ^= *slot2; - *slot2 = *slot1 ^ *slot2; - *slot1 = *slot1 ^ *slot2; -} - static void ptrack_mark_map_pair(size_t slot1, size_t slot2, uint32 new_lsn32) { @@ -786,10 +777,7 @@ ptrack_mark_block(RelFileNodeBackend smgr_rnode, bid.blocknum = InvalidBlockNumber; hash = BID_HASH_FUNC(bid); max_lsn_slot1 = (size_t)(hash % PtrackContentNblocks); - max_lsn_slot2 = max_lsn_slot1 + 1; - - if (max_lsn_slot2 < max_lsn_slot1) - swap_slots(&max_lsn_slot1, &max_lsn_slot2); + max_lsn_slot2 = (max_lsn_slot1 + 1) % PtrackContentNblocks; if (RecoveryInProgress()) new_lsn = GetXLogReplayRecPtr(NULL); @@ -830,10 +818,7 @@ XLogRecPtr ptrack_read_file_maxlsn(RelFileNode rnode, ForkNumber forknum) hash = BID_HASH_FUNC(bid); slot1 = (size_t)(hash % PtrackContentNblocks); - slot2 = slot1 + 1; - - if (slot2 < slot1) - swap_slots(&slot1, &slot2); + slot2 = (slot1 + 1) % PtrackContentNblocks; update_lsn1 = pg_atomic_read_u32(&ptrack_map->entries[slot1]); update_lsn2 = pg_atomic_read_u32(&ptrack_map->entries[slot2]); diff --git a/engine.h b/engine.h index 99bd4d8..822b13f 100644 --- a/engine.h +++ b/engine.h @@ -44,9 +44,9 @@ * A reasonable assumption for most systems. Postgres core * leverages the same value for this purpose. */ -#define CACHE_LINE_SIZE 64 -#define CACHE_LINE_ALIGN(LEN) TYPEALIGN(CACHE_LINE_SIZE, (LEN)) -#define ENTRIES_PER_LINE (CACHE_LINE_SIZE/sizeof(XLogRecPtr)) +#define MEMORY_PAGE_SIZE 4096 +#define MEMORY_PAGE_ALIGN(LEN) TYPEALIGN(MEMORY_PAGE_SIZE, (LEN)) +#define ENTRIES_PER_PAGE (MEMORY_PAGE_SIZE/sizeof(XLogRecPtr)) /* Ptrack magic bytes */ #define PTRACK_MAGIC "ptk" From 79e3bda217c762ffd6ebc732e0e5a11efe9faef4 Mon Sep 17 00:00:00 2001 From: Daniel Shelepanov Date: Fri, 9 Jun 2023 11:45:18 +0300 Subject: [PATCH 5/5] intermediate --- engine.c | 37 ++++++++++--------------------------- engine.h | 11 +++++++---- 2 files changed, 17 insertions(+), 31 deletions(-) diff --git a/engine.c b/engine.c index f7baa02..76af990 100644 --- a/engine.c +++ b/engine.c @@ -449,9 +449,9 @@ ptrackCheckpoint(void) uint32 lsn; /* - * We store LSN values as pg_atomic_uint64 in the ptrack map, but - * pg_atomic_read_u64() returns uint64. That way, we have to put this - * lsn into the buffer array of pg_atomic_uint64's. We are the only + * We store LSN values as pg_atomic_uint32 in the ptrack map, but + * pg_atomic_read_u32() returns uint32. That way, we have to put this + * lsn into the buffer array of pg_atomic_uint32's. We are the only * one who write into this buffer, so we do it without locks. * * TODO: is it safe and can we do any better? @@ -551,7 +551,7 @@ assign_ptrack_map_size(int newval, void *extra) !InitializingParallelWorker) { /* Cast to uint64 in order to avoid int32 overflow */ - ptrack_map_size = (uint64) 1024 * 1024 * newval; + ptrack_map_size = (uint64)(1024 * 1024 * newval); elog(DEBUG1, "assign_ptrack_map_size: ptrack_map_size set to " UINT64_FORMAT, ptrack_map_size); @@ -688,30 +688,13 @@ ptrack_walkdir(const char *path, Oid tablespaceOid, Oid dbOid) /* * Get a second position within ptrack map so that it fits - * within the same cache line. + * within the same memory page. */ -size_t -get_slot2(size_t slot1, uint64 hash) { - size_t memory_page_ep; // ending point of a cache line - size_t memory_page_sp; // starting point of a cache line - size_t memory_page_interval; - size_t slot2; - - /* Get the ending point of a memory page within entries[]. */ - memory_page_ep = (MEMORY_PAGE_ALIGN(offsetof(PtrackMapHdr, entries) + slot1*sizeof(uint32)) - - offsetof(PtrackMapHdr, entries)) / sizeof(uint32); - /* handling an overflow beyond the entries boundary */ - memory_page_ep = memory_page_ep > PtrackContentNblocks ? PtrackContentNblocks : memory_page_ep; - - /* Get the starting point of a cache line within entries[]. */ - memory_page_sp = memory_page_ep - ENTRIES_PER_PAGE; - - /* Handling overflow below zero (sp then must be larger than ep) */ - memory_page_sp = memory_page_sp > memory_page_ep ? 0 : memory_page_sp; - - memory_page_interval = memory_page_ep - memory_page_sp; - slot2 = (size_t)(memory_page_sp + (((hash << 32) | (hash >> 32)) % memory_page_interval)); - slot2 = (slot1 == slot2) ? ((slot1+1) % memory_page_interval) : slot2; +inline size_t +get_slot2(size_t slot1, uint32 hash) { + size_t slot2; + slot2 = TYPEALIGN_DOWN(ENTRIES_PER_PAGE, slot1) + ((hash << 16) | (hash >> 16)) % ENTRIES_PER_PAGE; + slot2 = slot1 == slot2 ? slot2+1 : slot2; return slot2; } diff --git a/engine.h b/engine.h index 822b13f..98f2ceb 100644 --- a/engine.h +++ b/engine.h @@ -44,9 +44,10 @@ * A reasonable assumption for most systems. Postgres core * leverages the same value for this purpose. */ -#define MEMORY_PAGE_SIZE 4096 -#define MEMORY_PAGE_ALIGN(LEN) TYPEALIGN(MEMORY_PAGE_SIZE, (LEN)) -#define ENTRIES_PER_PAGE (MEMORY_PAGE_SIZE/sizeof(XLogRecPtr)) +#define MEMORY_PAGE_SIZE 4096 +#define MEMORY_PAGE_ALIGN(LEN) TYPEALIGN(MEMORY_PAGE_SIZE, (LEN)) +#define MEMORY_PAGE_ALIGN_DOWN(LEN) TYPEALIGN_DOWN(MEMORY_PAGE_SIZE, (LEN)) +#define ENTRIES_PER_PAGE (MEMORY_PAGE_SIZE/sizeof(uint32)) /* Ptrack magic bytes */ #define PTRACK_MAGIC "ptk" @@ -73,6 +74,8 @@ typedef struct PtrackMapHdr */ uint32 version_num; + /* Padding needed to align entries[] by the page boundary */ + char padding[4096 - PTRACK_MAGIC_SIZE - sizeof(uint32) - 2*sizeof(pg_atomic_uint32)]; /* LSN of current writing position */ pg_atomic_uint32 latest_lsn; /* LSN of the moment, when map was last enabled. */ @@ -130,7 +133,7 @@ extern XLogRecPtr ptrack_read_file_maxlsn(RelFileNode smgr_rnode, ForkNumber forknum); extern bool is_cfm_file_path(const char *path); -extern size_t get_slot2(size_t slot1, uint64 hash); +extern size_t get_slot2(size_t slot1, uint32 hash); #ifdef PGPRO_EE extern off_t get_cfs_relation_file_decompressed_size(RelFileNodeBackend rnode, const char *fullpath, ForkNumber forknum);