#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "port/atomics.h"
#include "postmaster/bgwriter.h"
#include "replication/syncrep.h"
#include "storage/bufmgr.h"
int ckpt_flags; /* checkpoint flags, as defined in xlog.h */
- uint32 num_backend_writes; /* counts user backend buffer writes */
- uint32 num_backend_fsync; /* counts user backend fsync calls */
+ pg_atomic_uint32 num_backend_writes; /* counts user backend buffer writes */
+ pg_atomic_uint32 num_backend_fsync; /* counts user backend fsync calls */
+ pg_atomic_uint32 ckpt_cycle; /* cycle */
int num_requests; /* current # of requests */
int max_requests; /* allocated array size */
MemSet(CheckpointerShmem, 0, size);
SpinLockInit(&CheckpointerShmem->ckpt_lck);
CheckpointerShmem->max_requests = NBuffers;
+ pg_atomic_init_u32(&CheckpointerShmem->ckpt_cycle, 0);
+ pg_atomic_init_u32(&CheckpointerShmem->num_backend_writes, 0);
+ pg_atomic_init_u32(&CheckpointerShmem->num_backend_fsync, 0);
}
}
LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE);
- /* Count all backend writes regardless of if they fit in the queue */
- if (!AmBackgroundWriterProcess())
- CheckpointerShmem->num_backend_writes++;
-
/*
* If the checkpointer isn't running or the request queue is full, the
* backend will have to perform its own fsync request. But before forcing
* fsync
*/
if (!AmBackgroundWriterProcess())
- CheckpointerShmem->num_backend_fsync++;
+ pg_atomic_fetch_add_u32(&CheckpointerShmem->num_backend_fsync, 1);
LWLockRelease(CheckpointerCommLock);
return false;
}
LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE);
/* Transfer stats counts into pending pgstats message */
- BgWriterStats.m_buf_written_backend += CheckpointerShmem->num_backend_writes;
- BgWriterStats.m_buf_fsync_backend += CheckpointerShmem->num_backend_fsync;
-
- CheckpointerShmem->num_backend_writes = 0;
- CheckpointerShmem->num_backend_fsync = 0;
+ BgWriterStats.m_buf_written_backend +=
+ pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_writes, 0);
+ BgWriterStats.m_buf_fsync_backend +=
+ pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_fsync, 0);
/*
* We try to avoid holding the lock for a long time by copying the request
return FirstCall;
}
+
+uint32
+GetCheckpointSyncCycle(void)
+{
+ return pg_atomic_read_u32(&CheckpointerShmem->ckpt_cycle);
+}
+
+uint32
+IncCheckpointSyncCycle(void)
+{
+ return pg_atomic_fetch_add_u32(&CheckpointerShmem->ckpt_cycle, 1);
+}
{
File mdfd_vfd; /* fd number in fd.c's pool */
BlockNumber mdfd_segno; /* segment number, from 0 */
+ uint32 mdfd_dirtied_cycle;
} MdfdVec;
static MemoryContext MdCxt; /* context for all MdfdVec objects */
* (Regular backends do not track pending operations locally, but forward
* them to the checkpointer.)
*/
-typedef uint16 CycleCtr; /* can be any convenient integer size */
+typedef uint32 CycleCtr; /* can be any convenient integer size */
typedef struct
{
RelFileNode rnode; /* hash table key (must be first!) */
- CycleCtr cycle_ctr; /* mdsync_cycle_ctr of oldest request */
+ CycleCtr cycle_ctr; /* sync cycle of oldest request */
/* requests[f] has bit n set if we need to fsync segment n of fork f */
Bitmapset *requests[MAX_FORKNUM + 1];
/* canceled[f] is true if we canceled fsyncs for fork "recently" */
static List *pendingUnlinks = NIL;
static MemoryContext pendingOpsCxt; /* context for the above */
-static CycleCtr mdsync_cycle_ctr = 0;
static CycleCtr mdckpt_cycle_ctr = 0;
mdfd = &reln->md_seg_fds[forkNum][0];
mdfd->mdfd_vfd = fd;
mdfd->mdfd_segno = 0;
+ mdfd->mdfd_dirtied_cycle = GetCheckpointSyncCycle() - 1;
}
/*
mdfd = &reln->md_seg_fds[forknum][0];
mdfd->mdfd_vfd = fd;
mdfd->mdfd_segno = 0;
+ mdfd->mdfd_dirtied_cycle = GetCheckpointSyncCycle() - 1;
Assert(_mdnblocks(reln, forknum, mdfd) <= ((BlockNumber) RELSEG_SIZE));
* To avoid excess fsync'ing (in the worst case, maybe a never-terminating
* checkpoint), we want to ignore fsync requests that are entered into the
* hashtable after this point --- they should be processed next time,
- * instead. We use mdsync_cycle_ctr to tell old entries apart from new
- * ones: new ones will have cycle_ctr equal to the incremented value of
- * mdsync_cycle_ctr.
+ * instead. We use GetCheckpointSyncCycle() to tell old entries apart
+ * from new ones: new ones will have cycle_ctr equal to
+ * IncCheckpointSyncCycle().
*
* In normal circumstances, all entries present in the table at this point
* will have cycle_ctr exactly equal to the current (about to be old)
hash_seq_init(&hstat, pendingOpsTable);
while ((entry = (PendingOperationEntry *) hash_seq_search(&hstat)) != NULL)
{
- entry->cycle_ctr = mdsync_cycle_ctr;
+ entry->cycle_ctr = GetCheckpointSyncCycle();
}
}
- /* Advance counter so that new hashtable entries are distinguishable */
- mdsync_cycle_ctr++;
-
/* Set flag to detect failure if we don't reach the end of the loop */
mdsync_in_progress = true;
+ /* Advance counter so that new hashtable entries are distinguishable */
+ IncCheckpointSyncCycle();
+
/* Now scan the hashtable for fsync requests to process */
absorb_counter = FSYNCS_PER_ABSORB;
hash_seq_init(&hstat, pendingOpsTable);
* contain multiple fsync-request bits, but they are all new. Note
* "continue" bypasses the hash-remove call at the bottom of the loop.
*/
- if (entry->cycle_ctr == mdsync_cycle_ctr)
+ if (entry->cycle_ctr == GetCheckpointSyncCycle())
continue;
/* Else assert we haven't missed it */
- Assert((CycleCtr) (entry->cycle_ctr + 1) == mdsync_cycle_ctr);
+ Assert((CycleCtr) (entry->cycle_ctr + 1) == GetCheckpointSyncCycle());
/*
* Scan over the forks and segments represented by the entry.
break;
}
if (forknum <= MAX_FORKNUM)
- entry->cycle_ctr = mdsync_cycle_ctr;
+ entry->cycle_ctr = GetCheckpointSyncCycle();
else
{
/* Okay to remove it */
static void
register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg)
{
+ uint32 cycle;
+
/* Temp relations should never be fsync'd */
Assert(!SmgrIsTemp(reln));
+ pg_memory_barrier();
+ cycle = GetCheckpointSyncCycle();
+
+ /*
+ * Don't repeatedly register the same segment as dirty.
+ *
+ * FIXME: This doesn't correctly deal with overflows yet! We could
+ * e.g. emit an smgr invalidation every now and then, or use a 64bit
+ * counter. Or just error out if the cycle reaches UINT32_MAX.
+ */
+ if (seg->mdfd_dirtied_cycle == cycle)
+ return;
+
if (pendingOpsTable)
{
/* push it into local pending-ops table */
RememberFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno);
+ seg->mdfd_dirtied_cycle = cycle;
}
else
{
if (ForwardFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno))
+ {
+ seg->mdfd_dirtied_cycle = cycle;
return; /* passed it off successfully */
+ }
ereport(DEBUG1,
(errmsg("could not forward fsync request because request queue is full")));
/* if new entry, initialize it */
if (!found)
{
- entry->cycle_ctr = mdsync_cycle_ctr;
+ entry->cycle_ctr = GetCheckpointSyncCycle();
MemSet(entry->requests, 0, sizeof(entry->requests));
MemSet(entry->canceled, 0, sizeof(entry->canceled));
}
v = &reln->md_seg_fds[forknum][segno];
v->mdfd_vfd = fd;
v->mdfd_segno = segno;
+ v->mdfd_dirtied_cycle = GetCheckpointSyncCycle() - 1;
Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE));