mirror of
https://github.com/postgres/postgres.git
synced 2026-04-23 15:19:08 -04:00
tapes) for each merge step. This will give us some idea of how effective the merge distribution algorithm is.
2641 lines
80 KiB
C
2641 lines
80 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* tuplesort.c
|
|
* Generalized tuple sorting routines.
|
|
*
|
|
* This module handles sorting of heap tuples, index tuples, or single
|
|
* Datums (and could easily support other kinds of sortable objects,
|
|
* if necessary). It works efficiently for both small and large amounts
|
|
* of data. Small amounts are sorted in-memory using qsort(). Large
|
|
* amounts are sorted using temporary files and a standard external sort
|
|
* algorithm.
|
|
*
|
|
* See Knuth, volume 3, for more than you want to know about the external
|
|
* sorting algorithm. We divide the input into sorted runs using replacement
|
|
* selection, in the form of a priority tree implemented as a heap
|
|
* (essentially his Algorithm 5.2.3H), then merge the runs using polyphase
|
|
* merge, Knuth's Algorithm 5.4.2D. The logical "tapes" used by Algorithm D
|
|
* are implemented by logtape.c, which avoids space wastage by recycling
|
|
* disk space as soon as each block is read from its "tape".
|
|
*
|
|
* We do not form the initial runs using Knuth's recommended replacement
|
|
* selection data structure (Algorithm 5.4.1R), because it uses a fixed
|
|
* number of records in memory at all times. Since we are dealing with
|
|
* tuples that may vary considerably in size, we want to be able to vary
|
|
* the number of records kept in memory to ensure full utilization of the
|
|
* allowed sort memory space. So, we keep the tuples in a variable-size
|
|
* heap, with the next record to go out at the top of the heap. Like
|
|
* Algorithm 5.4.1R, each record is stored with the run number that it
|
|
* must go into, and we use (run number, key) as the ordering key for the
|
|
* heap. When the run number at the top of the heap changes, we know that
|
|
* no more records of the prior run are left in the heap.
|
|
*
|
|
* The approximate amount of memory allowed for any one sort operation
|
|
* is specified in kilobytes by the caller (most pass work_mem). Initially,
|
|
* we absorb tuples and simply store them in an unsorted array as long as
|
|
* we haven't exceeded workMem. If we reach the end of the input without
|
|
* exceeding workMem, we sort the array using qsort() and subsequently return
|
|
* tuples just by scanning the tuple array sequentially. If we do exceed
|
|
* workMem, we construct a heap using Algorithm H and begin to emit tuples
|
|
* into sorted runs in temporary tapes, emitting just enough tuples at each
|
|
* step to get back within the workMem limit. Whenever the run number at
|
|
* the top of the heap changes, we begin a new run with a new output tape
|
|
* (selected per Algorithm D). After the end of the input is reached,
|
|
* we dump out remaining tuples in memory into a final run (or two),
|
|
* then merge the runs using Algorithm D.
|
|
*
|
|
* When merging runs, we use a heap containing just the frontmost tuple from
|
|
* each source run; we repeatedly output the smallest tuple and insert the
|
|
* next tuple from its source tape (if any). When the heap empties, the merge
|
|
* is complete. The basic merge algorithm thus needs very little memory ---
|
|
* only M tuples for an M-way merge, and M is constrained to a small number.
|
|
* However, we can still make good use of our full workMem allocation by
|
|
* pre-reading additional tuples from each source tape. Without prereading,
|
|
* our access pattern to the temporary file would be very erratic; on average
|
|
* we'd read one block from each of M source tapes during the same time that
|
|
* we're writing M blocks to the output tape, so there is no sequentiality of
|
|
* access at all, defeating the read-ahead methods used by most Unix kernels.
|
|
* Worse, the output tape gets written into a very random sequence of blocks
|
|
* of the temp file, ensuring that things will be even worse when it comes
|
|
* time to read that tape. A straightforward merge pass thus ends up doing a
|
|
* lot of waiting for disk seeks. We can improve matters by prereading from
|
|
* each source tape sequentially, loading about workMem/M bytes from each tape
|
|
* in turn. Then we run the merge algorithm, writing but not reading until
|
|
* one of the preloaded tuple series runs out. Then we switch back to preread
|
|
* mode, fill memory again, and repeat. This approach helps to localize both
|
|
* read and write accesses.
|
|
*
|
|
* When the caller requests random access to the sort result, we form
|
|
* the final sorted run on a logical tape which is then "frozen", so
|
|
* that we can access it randomly. When the caller does not need random
|
|
* access, we return from tuplesort_performsort() as soon as we are down
|
|
* to one run per logical tape. The final merge is then performed
|
|
* on-the-fly as the caller repeatedly calls tuplesort_gettuple; this
|
|
* saves one cycle of writing all the data out to disk and reading it in.
|
|
*
|
|
* Before Postgres 8.2, we always used a seven-tape polyphase merge, on the
|
|
* grounds that 7 is the "sweet spot" on the tapes-to-passes curve according
|
|
* to Knuth's figure 70 (section 5.4.2). However, Knuth is assuming that
|
|
* tape drives are expensive beasts, and in particular that there will always
|
|
* be many more runs than tape drives. In our implementation a "tape drive"
|
|
* doesn't cost much more than a few Kb of memory buffers, so we can afford
|
|
* to have lots of them. In particular, if we can have as many tape drives
|
|
* as sorted runs, we can eliminate any repeated I/O at all. In the current
|
|
* code we determine the number of tapes M on the basis of workMem: we want
|
|
* workMem/M to be large enough that we read a fair amount of data each time
|
|
* we preread from a tape, so as to maintain the locality of access described
|
|
* above. Nonetheless, with large workMem we can have many tapes.
|
|
*
|
|
*
|
|
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
*
|
|
* IDENTIFICATION
|
|
* $PostgreSQL: pgsql/src/backend/utils/sort/tuplesort.c,v 1.64 2006/03/08 16:59:03 tgl Exp $
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
|
|
#include "postgres.h"
|
|
|
|
#include "access/heapam.h"
|
|
#include "access/nbtree.h"
|
|
#include "catalog/pg_amop.h"
|
|
#include "catalog/pg_operator.h"
|
|
#include "miscadmin.h"
|
|
#include "utils/catcache.h"
|
|
#include "utils/datum.h"
|
|
#include "utils/logtape.h"
|
|
#include "utils/lsyscache.h"
|
|
#include "utils/memutils.h"
|
|
#include "utils/pg_rusage.h"
|
|
#include "utils/syscache.h"
|
|
#include "utils/tuplesort.h"
|
|
|
|
|
|
/* GUC variable */
|
|
#ifdef TRACE_SORT
|
|
bool trace_sort = false;
|
|
#endif
|
|
|
|
|
|
/*
|
|
* The objects we actually sort are SortTuple structs. These contain
|
|
* a pointer to the tuple proper (might be a HeapTuple or IndexTuple),
|
|
* which is a separate palloc chunk --- we assume it is just one chunk and
|
|
* can be freed by a simple pfree(). SortTuples also contain the tuple's
|
|
* first key column in Datum/nullflag format, and an index integer.
|
|
*
|
|
* Storing the first key column lets us save heap_getattr or index_getattr
|
|
* calls during tuple comparisons. We could extract and save all the key
|
|
* columns not just the first, but this would increase code complexity and
|
|
* overhead, and wouldn't actually save any comparison cycles in the common
|
|
* case where the first key determines the comparison result. Note that
|
|
* for a pass-by-reference datatype, datum1 points into the "tuple" storage.
|
|
*
|
|
* When sorting single Datums, the data value is represented directly by
|
|
* datum1/isnull1. If the datatype is pass-by-reference and isnull1 is false,
|
|
* then datum1 points to a separately palloc'd data value that is also pointed
|
|
* to by the "tuple" pointer; otherwise "tuple" is NULL.
|
|
*
|
|
* While building initial runs, tupindex holds the tuple's run number. During
|
|
* merge passes, we re-use it to hold the input tape number that each tuple in
|
|
* the heap was read from, or to hold the index of the next tuple pre-read
|
|
* from the same tape in the case of pre-read entries. tupindex goes unused
|
|
* if the sort occurs entirely in memory.
|
|
*/
|
|
typedef struct
|
|
{
|
|
void *tuple; /* the tuple proper */
|
|
Datum datum1; /* value of first key column */
|
|
bool isnull1; /* is first key column NULL? */
|
|
int tupindex; /* see notes above */
|
|
} SortTuple;
|
|
|
|
|
|
/*
|
|
* Possible states of a Tuplesort object. These denote the states that
|
|
* persist between calls of Tuplesort routines.
|
|
*/
|
|
typedef enum
|
|
{
|
|
TSS_INITIAL, /* Loading tuples; still within memory limit */
|
|
TSS_BUILDRUNS, /* Loading tuples; writing to tape */
|
|
TSS_SORTEDINMEM, /* Sort completed entirely in memory */
|
|
TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */
|
|
TSS_FINALMERGE /* Performing final merge on-the-fly */
|
|
} TupSortStatus;
|
|
|
|
/*
|
|
* Parameters for calculation of number of tapes to use --- see inittapes()
|
|
* and tuplesort_merge_order().
|
|
*
|
|
* In this calculation we assume that each tape will cost us about 3 blocks
|
|
* worth of buffer space (which is an underestimate for very large data
|
|
* volumes, but it's probably close enough --- see logtape.c).
|
|
*
|
|
* MERGE_BUFFER_SIZE is how much data we'd like to read from each input
|
|
* tape during a preread cycle (see discussion at top of file).
|
|
*/
|
|
#define MINORDER 6 /* minimum merge order */
|
|
#define TAPE_BUFFER_OVERHEAD (BLCKSZ * 3)
|
|
#define MERGE_BUFFER_SIZE (BLCKSZ * 32)
|
|
|
|
/*
|
|
* Private state of a Tuplesort operation.
|
|
*/
|
|
struct Tuplesortstate
|
|
{
|
|
TupSortStatus status; /* enumerated value as shown above */
|
|
int nKeys; /* number of columns in sort key */
|
|
bool randomAccess; /* did caller request random access? */
|
|
long availMem; /* remaining memory available, in bytes */
|
|
long allowedMem; /* total memory allowed, in bytes */
|
|
int maxTapes; /* number of tapes (Knuth's T) */
|
|
int tapeRange; /* maxTapes-1 (Knuth's P) */
|
|
MemoryContext sortcontext; /* memory context holding all sort data */
|
|
LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
|
|
|
|
/*
|
|
* These function pointers decouple the routines that must know what kind
|
|
* of tuple we are sorting from the routines that don't need to know it.
|
|
* They are set up by the tuplesort_begin_xxx routines.
|
|
*
|
|
* Function to compare two tuples; result is per qsort() convention, ie:
|
|
*
|
|
* <0, 0, >0 according as a<b, a=b, a>b.
|
|
*/
|
|
int (*comparetup) (Tuplesortstate *state,
|
|
const SortTuple *a, const SortTuple *b);
|
|
|
|
/*
|
|
* Function to copy a supplied input tuple into palloc'd space and set up
|
|
* its SortTuple representation (ie, set tuple/datum1/isnull1). Also,
|
|
* state->availMem must be decreased by the amount of space used for the
|
|
* tuple copy (note the SortTuple struct itself is not counted).
|
|
*/
|
|
void (*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup);
|
|
|
|
/*
|
|
* Function to write a stored tuple onto tape. The representation of the
|
|
* tuple on tape need not be the same as it is in memory; requirements on
|
|
* the tape representation are given below. After writing the tuple,
|
|
* pfree() the out-of-line data (not the SortTuple struct!), and increase
|
|
* state->availMem by the amount of memory space thereby released.
|
|
*/
|
|
void (*writetup) (Tuplesortstate *state, int tapenum,
|
|
SortTuple *stup);
|
|
|
|
/*
|
|
* Function to read a stored tuple from tape back into memory. 'len' is
|
|
* the already-read length of the stored tuple. Create a palloc'd copy,
|
|
* initialize tuple/datum1/isnull1 in the target SortTuple struct,
|
|
* and decrease state->availMem by the amount of memory space consumed.
|
|
*/
|
|
void (*readtup) (Tuplesortstate *state, SortTuple *stup,
|
|
int tapenum, unsigned int len);
|
|
|
|
/*
|
|
* This array holds the tuples now in sort memory. If we are in state
|
|
* INITIAL, the tuples are in no particular order; if we are in state
|
|
* SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
|
|
* and FINALMERGE, the tuples are organized in "heap" order per Algorithm
|
|
* H. (Note that memtupcount only counts the tuples that are part of the
|
|
* heap --- during merge passes, memtuples[] entries beyond tapeRange are
|
|
* never in the heap and are used to hold pre-read tuples.) In state
|
|
* SORTEDONTAPE, the array is not used.
|
|
*/
|
|
SortTuple *memtuples; /* array of SortTuple structs */
|
|
int memtupcount; /* number of tuples currently present */
|
|
int memtupsize; /* allocated length of memtuples array */
|
|
|
|
/*
|
|
* While building initial runs, this is the current output run number
|
|
* (starting at 0). Afterwards, it is the number of initial runs we made.
|
|
*/
|
|
int currentRun;
|
|
|
|
/*
|
|
* Unless otherwise noted, all pointer variables below are pointers
|
|
* to arrays of length maxTapes, holding per-tape data.
|
|
*/
|
|
|
|
/*
|
|
* These variables are only used during merge passes. mergeactive[i] is
|
|
* true if we are reading an input run from (actual) tape number i and
|
|
* have not yet exhausted that run. mergenext[i] is the memtuples index
|
|
* of the next pre-read tuple (next to be loaded into the heap) for tape
|
|
* i, or 0 if we are out of pre-read tuples. mergelast[i] similarly
|
|
* points to the last pre-read tuple from each tape. mergeavailmem[i] is
|
|
* the amount of unused space allocated for tape i. mergefreelist and
|
|
* mergefirstfree keep track of unused locations in the memtuples[] array.
|
|
* The memtuples[].tupindex fields link together pre-read tuples for each
|
|
* tape as well as recycled locations in mergefreelist. It is OK to use 0
|
|
* as a null link in these lists, because memtuples[0] is part of the
|
|
* merge heap and is never a pre-read tuple. mergeslotsfree counts the
|
|
* total number of free memtuples[] slots, both those in the freelist and
|
|
* those beyond mergefirstfree.
|
|
*/
|
|
bool *mergeactive; /* Active input run source? */
|
|
int *mergenext; /* first preread tuple for each source */
|
|
int *mergelast; /* last preread tuple for each source */
|
|
long *mergeavailmem; /* availMem for prereading tapes */
|
|
long spacePerTape; /* actual per-tape target usage */
|
|
int mergefreelist; /* head of freelist of recycled slots */
|
|
int mergefirstfree; /* first slot never used in this merge */
|
|
int mergeslotsfree; /* number of free slots during merge */
|
|
|
|
/*
|
|
* Variables for Algorithm D. Note that destTape is a "logical" tape
|
|
* number, ie, an index into the tp_xxx[] arrays. Be careful to keep
|
|
* "logical" and "actual" tape numbers straight!
|
|
*/
|
|
int Level; /* Knuth's l */
|
|
int destTape; /* current output tape (Knuth's j, less 1) */
|
|
int *tp_fib; /* Target Fibonacci run counts (A[]) */
|
|
int *tp_runs; /* # of real runs on each tape */
|
|
int *tp_dummy; /* # of dummy runs for each tape (D[]) */
|
|
int *tp_tapenum; /* Actual tape numbers (TAPE[]) */
|
|
int activeTapes; /* # of active input tapes in merge pass */
|
|
|
|
/*
|
|
* These variables are used after completion of sorting to keep track of
|
|
* the next tuple to return. (In the tape case, the tape's current read
|
|
* position is also critical state.)
|
|
*/
|
|
int result_tape; /* actual tape number of finished output */
|
|
int current; /* array index (only used if SORTEDINMEM) */
|
|
bool eof_reached; /* reached EOF (needed for cursors) */
|
|
|
|
/* markpos_xxx holds marked position for mark and restore */
|
|
long markpos_block; /* tape block# (only used if SORTEDONTAPE) */
|
|
int markpos_offset; /* saved "current", or offset in tape block */
|
|
bool markpos_eof; /* saved "eof_reached" */
|
|
|
|
/*
|
|
* These variables are specific to the HeapTuple case; they are set by
|
|
* tuplesort_begin_heap and used only by the HeapTuple routines.
|
|
*/
|
|
TupleDesc tupDesc;
|
|
ScanKey scanKeys; /* array of length nKeys */
|
|
SortFunctionKind *sortFnKinds; /* array of length nKeys */
|
|
|
|
/*
|
|
* These variables are specific to the IndexTuple case; they are set by
|
|
* tuplesort_begin_index and used only by the IndexTuple routines.
|
|
*/
|
|
Relation indexRel;
|
|
ScanKey indexScanKey;
|
|
bool enforceUnique; /* complain if we find duplicate tuples */
|
|
|
|
/*
|
|
* These variables are specific to the Datum case; they are set by
|
|
* tuplesort_begin_datum and used only by the DatumTuple routines.
|
|
*/
|
|
Oid datumType;
|
|
Oid sortOperator;
|
|
FmgrInfo sortOpFn; /* cached lookup data for sortOperator */
|
|
SortFunctionKind sortFnKind;
|
|
/* we need typelen and byval in order to know how to copy the Datums. */
|
|
int datumTypeLen;
|
|
bool datumTypeByVal;
|
|
|
|
/*
|
|
* Resource snapshot for time of sort start.
|
|
*/
|
|
#ifdef TRACE_SORT
|
|
PGRUsage ru_start;
|
|
#endif
|
|
};
|
|
|
|
#define COMPARETUP(state,a,b) ((*(state)->comparetup) (state, a, b))
|
|
#define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
|
|
#define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup))
|
|
#define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
|
|
#define LACKMEM(state) ((state)->availMem < 0)
|
|
#define USEMEM(state,amt) ((state)->availMem -= (amt))
|
|
#define FREEMEM(state,amt) ((state)->availMem += (amt))
|
|
|
|
/*
|
|
* NOTES about on-tape representation of tuples:
|
|
*
|
|
* We require the first "unsigned int" of a stored tuple to be the total size
|
|
* on-tape of the tuple, including itself (so it is never zero; an all-zero
|
|
* unsigned int is used to delimit runs). The remainder of the stored tuple
|
|
* may or may not match the in-memory representation of the tuple ---
|
|
* any conversion needed is the job of the writetup and readtup routines.
|
|
*
|
|
* If state->randomAccess is true, then the stored representation of the
|
|
* tuple must be followed by another "unsigned int" that is a copy of the
|
|
* length --- so the total tape space used is actually sizeof(unsigned int)
|
|
* more than the stored length value. This allows read-backwards. When
|
|
* randomAccess is not true, the write/read routines may omit the extra
|
|
* length word.
|
|
*
|
|
* writetup is expected to write both length words as well as the tuple
|
|
* data. When readtup is called, the tape is positioned just after the
|
|
* front length word; readtup must read the tuple data and advance past
|
|
* the back length word (if present).
|
|
*
|
|
* The write/read routines can make use of the tuple description data
|
|
* stored in the Tuplesortstate record, if needed. They are also expected
|
|
* to adjust state->availMem by the amount of memory space (not tape space!)
|
|
* released or consumed. There is no error return from either writetup
|
|
* or readtup; they should ereport() on failure.
|
|
*
|
|
*
|
|
* NOTES about memory consumption calculations:
|
|
*
|
|
* We count space allocated for tuples against the workMem limit, plus
|
|
* the space used by the variable-size memtuples array. Fixed-size space
|
|
* is not counted; it's small enough to not be interesting.
|
|
*
|
|
* Note that we count actual space used (as shown by GetMemoryChunkSpace)
|
|
* rather than the originally-requested size. This is important since
|
|
* palloc can add substantial overhead. It's not a complete answer since
|
|
* we won't count any wasted space in palloc allocation blocks, but it's
|
|
* a lot better than what we were doing before 7.3.
|
|
*/
|
|
|
|
|
|
static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess);
|
|
static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
|
|
static void inittapes(Tuplesortstate *state);
|
|
static void selectnewtape(Tuplesortstate *state);
|
|
static void mergeruns(Tuplesortstate *state);
|
|
static void mergeonerun(Tuplesortstate *state);
|
|
static void beginmerge(Tuplesortstate *state);
|
|
static void mergepreread(Tuplesortstate *state);
|
|
static void dumptuples(Tuplesortstate *state, bool alltuples);
|
|
static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
|
|
int tupleindex, bool checkIndex);
|
|
static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
|
|
static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
|
|
static void markrunend(Tuplesortstate *state, int tapenum);
|
|
static int qsort_comparetup(const void *a, const void *b);
|
|
static int comparetup_heap(Tuplesortstate *state,
|
|
const SortTuple *a, const SortTuple *b);
|
|
static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
|
|
static void writetup_heap(Tuplesortstate *state, int tapenum,
|
|
SortTuple *stup);
|
|
static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
|
|
int tapenum, unsigned int len);
|
|
static int comparetup_index(Tuplesortstate *state,
|
|
const SortTuple *a, const SortTuple *b);
|
|
static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
|
|
static void writetup_index(Tuplesortstate *state, int tapenum,
|
|
SortTuple *stup);
|
|
static void readtup_index(Tuplesortstate *state, SortTuple *stup,
|
|
int tapenum, unsigned int len);
|
|
static int comparetup_datum(Tuplesortstate *state,
|
|
const SortTuple *a, const SortTuple *b);
|
|
static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
|
|
static void writetup_datum(Tuplesortstate *state, int tapenum,
|
|
SortTuple *stup);
|
|
static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
|
|
int tapenum, unsigned int len);
|
|
|
|
/*
|
|
* Since qsort(3) will not pass any context info to qsort_comparetup(),
|
|
* we have to use this ugly static variable. It is set to point to the
|
|
* active Tuplesortstate object just before calling qsort. It should
|
|
* not be used directly by anything except qsort_comparetup().
|
|
*/
|
|
static Tuplesortstate *qsort_tuplesortstate;
|
|
|
|
|
|
/*
|
|
* tuplesort_begin_xxx
|
|
*
|
|
* Initialize for a tuple sort operation.
|
|
*
|
|
* After calling tuplesort_begin, the caller should call tuplesort_puttuple
|
|
* zero or more times, then call tuplesort_performsort when all the tuples
|
|
* have been supplied. After performsort, retrieve the tuples in sorted
|
|
* order by calling tuplesort_gettuple until it returns NULL. (If random
|
|
* access was requested, rescan, markpos, and restorepos can also be called.)
|
|
* For Datum sorts, putdatum/getdatum are used instead of puttuple/gettuple.
|
|
* Call tuplesort_end to terminate the operation and release memory/disk space.
|
|
*
|
|
* Each variant of tuplesort_begin has a workMem parameter specifying the
|
|
* maximum number of kilobytes of RAM to use before spilling data to disk.
|
|
* (The normal value of this parameter is work_mem, but some callers use
|
|
* other values.) Each variant also has a randomAccess parameter specifying
|
|
* whether the caller needs non-sequential access to the sort result.
|
|
*/
|
|
|
|
static Tuplesortstate *
|
|
tuplesort_begin_common(int workMem, bool randomAccess)
|
|
{
|
|
Tuplesortstate *state;
|
|
MemoryContext sortcontext;
|
|
MemoryContext oldcontext;
|
|
|
|
/*
|
|
* Create a working memory context for this sort operation.
|
|
* All data needed by the sort will live inside this context.
|
|
*/
|
|
sortcontext = AllocSetContextCreate(CurrentMemoryContext,
|
|
"TupleSort",
|
|
ALLOCSET_DEFAULT_MINSIZE,
|
|
ALLOCSET_DEFAULT_INITSIZE,
|
|
ALLOCSET_DEFAULT_MAXSIZE);
|
|
|
|
/*
|
|
* Make the Tuplesortstate within the per-sort context. This way,
|
|
* we don't need a separate pfree() operation for it at shutdown.
|
|
*/
|
|
oldcontext = MemoryContextSwitchTo(sortcontext);
|
|
|
|
state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
|
|
|
|
#ifdef TRACE_SORT
|
|
if (trace_sort)
|
|
pg_rusage_init(&state->ru_start);
|
|
#endif
|
|
|
|
state->status = TSS_INITIAL;
|
|
state->randomAccess = randomAccess;
|
|
state->allowedMem = workMem * 1024L;
|
|
state->availMem = state->allowedMem;
|
|
state->sortcontext = sortcontext;
|
|
state->tapeset = NULL;
|
|
|
|
state->memtupcount = 0;
|
|
state->memtupsize = 1024; /* initial guess */
|
|
state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
|
|
|
|
USEMEM(state, GetMemoryChunkSpace(state->memtuples));
|
|
|
|
/* workMem must be large enough for the minimal memtuples array */
|
|
if (LACKMEM(state))
|
|
elog(ERROR, "insufficient memory allowed for sort");
|
|
|
|
state->currentRun = 0;
|
|
|
|
/*
|
|
* maxTapes, tapeRange, and Algorithm D variables will be initialized by
|
|
* inittapes(), if needed
|
|
*/
|
|
|
|
state->result_tape = -1; /* flag that result tape has not been formed */
|
|
|
|
MemoryContextSwitchTo(oldcontext);
|
|
|
|
return state;
|
|
}
|
|
|
|
Tuplesortstate *
|
|
tuplesort_begin_heap(TupleDesc tupDesc,
|
|
int nkeys,
|
|
Oid *sortOperators, AttrNumber *attNums,
|
|
int workMem, bool randomAccess)
|
|
{
|
|
Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
|
|
MemoryContext oldcontext;
|
|
int i;
|
|
|
|
oldcontext = MemoryContextSwitchTo(state->sortcontext);
|
|
|
|
AssertArg(nkeys > 0);
|
|
|
|
#ifdef TRACE_SORT
|
|
if (trace_sort)
|
|
elog(LOG,
|
|
"begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
|
|
nkeys, workMem, randomAccess ? 't' : 'f');
|
|
#endif
|
|
|
|
state->nKeys = nkeys;
|
|
|
|
state->comparetup = comparetup_heap;
|
|
state->copytup = copytup_heap;
|
|
state->writetup = writetup_heap;
|
|
state->readtup = readtup_heap;
|
|
|
|
state->tupDesc = tupDesc; /* assume we need not copy tupDesc */
|
|
state->scanKeys = (ScanKey) palloc0(nkeys * sizeof(ScanKeyData));
|
|
state->sortFnKinds = (SortFunctionKind *)
|
|
palloc0(nkeys * sizeof(SortFunctionKind));
|
|
|
|
for (i = 0; i < nkeys; i++)
|
|
{
|
|
RegProcedure sortFunction;
|
|
|
|
AssertArg(sortOperators[i] != 0);
|
|
AssertArg(attNums[i] != 0);
|
|
|
|
/* select a function that implements the sort operator */
|
|
SelectSortFunction(sortOperators[i], &sortFunction,
|
|
&state->sortFnKinds[i]);
|
|
|
|
/*
|
|
* We needn't fill in sk_strategy or sk_subtype since these scankeys
|
|
* will never be passed to an index.
|
|
*/
|
|
ScanKeyInit(&state->scanKeys[i],
|
|
attNums[i],
|
|
InvalidStrategy,
|
|
sortFunction,
|
|
(Datum) 0);
|
|
}
|
|
|
|
MemoryContextSwitchTo(oldcontext);
|
|
|
|
return state;
|
|
}
|
|
|
|
Tuplesortstate *
|
|
tuplesort_begin_index(Relation indexRel,
|
|
bool enforceUnique,
|
|
int workMem, bool randomAccess)
|
|
{
|
|
Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
|
|
MemoryContext oldcontext;
|
|
|
|
oldcontext = MemoryContextSwitchTo(state->sortcontext);
|
|
|
|
#ifdef TRACE_SORT
|
|
if (trace_sort)
|
|
elog(LOG,
|
|
"begin index sort: unique = %c, workMem = %d, randomAccess = %c",
|
|
enforceUnique ? 't' : 'f',
|
|
workMem, randomAccess ? 't' : 'f');
|
|
#endif
|
|
|
|
state->nKeys = RelationGetNumberOfAttributes(indexRel);
|
|
|
|
state->comparetup = comparetup_index;
|
|
state->copytup = copytup_index;
|
|
state->writetup = writetup_index;
|
|
state->readtup = readtup_index;
|
|
|
|
state->indexRel = indexRel;
|
|
/* see comments below about btree dependence of this code... */
|
|
state->indexScanKey = _bt_mkscankey_nodata(indexRel);
|
|
state->enforceUnique = enforceUnique;
|
|
|
|
MemoryContextSwitchTo(oldcontext);
|
|
|
|
return state;
|
|
}
|
|
|
|
Tuplesortstate *
|
|
tuplesort_begin_datum(Oid datumType,
|
|
Oid sortOperator,
|
|
int workMem, bool randomAccess)
|
|
{
|
|
Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
|
|
MemoryContext oldcontext;
|
|
RegProcedure sortFunction;
|
|
int16 typlen;
|
|
bool typbyval;
|
|
|
|
oldcontext = MemoryContextSwitchTo(state->sortcontext);
|
|
|
|
#ifdef TRACE_SORT
|
|
if (trace_sort)
|
|
elog(LOG,
|
|
"begin datum sort: workMem = %d, randomAccess = %c",
|
|
workMem, randomAccess ? 't' : 'f');
|
|
#endif
|
|
|
|
state->nKeys = 1; /* always a one-column sort */
|
|
|
|
state->comparetup = comparetup_datum;
|
|
state->copytup = copytup_datum;
|
|
state->writetup = writetup_datum;
|
|
state->readtup = readtup_datum;
|
|
|
|
state->datumType = datumType;
|
|
state->sortOperator = sortOperator;
|
|
|
|
/* select a function that implements the sort operator */
|
|
SelectSortFunction(sortOperator, &sortFunction, &state->sortFnKind);
|
|
/* and look up the function */
|
|
fmgr_info(sortFunction, &state->sortOpFn);
|
|
|
|
/* lookup necessary attributes of the datum type */
|
|
get_typlenbyval(datumType, &typlen, &typbyval);
|
|
state->datumTypeLen = typlen;
|
|
state->datumTypeByVal = typbyval;
|
|
|
|
MemoryContextSwitchTo(oldcontext);
|
|
|
|
return state;
|
|
}
|
|
|
|
/*
|
|
* tuplesort_end
|
|
*
|
|
* Release resources and clean up.
|
|
*
|
|
* NOTE: after calling this, any tuple pointers returned by tuplesort_gettuple
|
|
* or datum pointers returned by tuplesort_getdatum are pointing to garbage.
|
|
* Be careful not to attempt to use or free such pointers afterwards!
|
|
*/
|
|
void
|
|
tuplesort_end(Tuplesortstate *state)
|
|
{
|
|
/* context swap probably not needed, but let's be safe */
|
|
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
|
|
|
|
#ifdef TRACE_SORT
|
|
long spaceUsed;
|
|
|
|
if (state->tapeset)
|
|
spaceUsed = LogicalTapeSetBlocks(state->tapeset);
|
|
else
|
|
spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
|
|
#endif
|
|
|
|
/*
|
|
* Delete temporary "tape" files, if any.
|
|
*
|
|
* Note: want to include this in reported total cost of sort, hence
|
|
* need for two #ifdef TRACE_SORT sections.
|
|
*/
|
|
if (state->tapeset)
|
|
LogicalTapeSetClose(state->tapeset);
|
|
|
|
#ifdef TRACE_SORT
|
|
if (trace_sort)
|
|
{
|
|
if (state->tapeset)
|
|
elog(LOG, "external sort ended, %ld disk blocks used: %s",
|
|
spaceUsed, pg_rusage_show(&state->ru_start));
|
|
else
|
|
elog(LOG, "internal sort ended, %ld KB used: %s",
|
|
spaceUsed, pg_rusage_show(&state->ru_start));
|
|
}
|
|
#endif
|
|
|
|
MemoryContextSwitchTo(oldcontext);
|
|
|
|
/*
|
|
* Free the per-sort memory context, thereby releasing all working
|
|
* memory, including the Tuplesortstate struct itself.
|
|
*/
|
|
MemoryContextDelete(state->sortcontext);
|
|
}
|
|
|
|
/*
|
|
* Grow the memtuples[] array, if possible within our memory constraint.
|
|
* Return TRUE if able to enlarge the array, FALSE if not.
|
|
*
|
|
* At each increment we double the size of the array. When we are short
|
|
* on memory we could consider smaller increases, but because availMem
|
|
* moves around with tuple addition/removal, this might result in thrashing.
|
|
* Small increases in the array size are likely to be pretty inefficient.
|
|
*/
|
|
static bool
|
|
grow_memtuples(Tuplesortstate *state)
|
|
{
|
|
/*
|
|
* We need to be sure that we do not cause LACKMEM to become true, else
|
|
* the space management algorithm will go nuts. We assume here that
|
|
* the memory chunk overhead associated with the memtuples array is
|
|
* constant and so there will be no unexpected addition to what we ask
|
|
* for. (The minimum array size established in tuplesort_begin_common
|
|
* is large enough to force palloc to treat it as a separate chunk, so
|
|
* this assumption should be good. But let's check it.)
|
|
*/
|
|
if (state->availMem <= (long) (state->memtupsize * sizeof(SortTuple)))
|
|
return false;
|
|
/*
|
|
* On a 64-bit machine, allowedMem could be high enough to get us into
|
|
* trouble with MaxAllocSize, too.
|
|
*/
|
|
if ((Size) (state->memtupsize * 2) >= MaxAllocSize / sizeof(SortTuple))
|
|
return false;
|
|
|
|
FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
|
|
state->memtupsize *= 2;
|
|
state->memtuples = (SortTuple *)
|
|
repalloc(state->memtuples,
|
|
state->memtupsize * sizeof(SortTuple));
|
|
USEMEM(state, GetMemoryChunkSpace(state->memtuples));
|
|
if (LACKMEM(state))
|
|
elog(ERROR, "unexpected out-of-memory situation during sort");
|
|
return true;
|
|
}
|
|
|
|
/*
|
|
* Accept one tuple while collecting input data for sort.
|
|
*
|
|
* Note that the input tuple is always copied; the caller need not save it.
|
|
*/
|
|
void
|
|
tuplesort_puttuple(Tuplesortstate *state, void *tuple)
|
|
{
|
|
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
|
|
SortTuple stup;
|
|
|
|
/*
|
|
* Copy the given tuple into memory we control, and decrease availMem.
|
|
* Then call the code shared with the Datum case.
|
|
*/
|
|
COPYTUP(state, &stup, tuple);
|
|
|
|
puttuple_common(state, &stup);
|
|
|
|
MemoryContextSwitchTo(oldcontext);
|
|
}
|
|
|
|
/*
|
|
* Accept one Datum while collecting input data for sort.
|
|
*
|
|
* If the Datum is pass-by-ref type, the value will be copied.
|
|
*/
|
|
void
|
|
tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
|
|
{
|
|
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
|
|
SortTuple stup;
|
|
|
|
/*
|
|
* If it's a pass-by-reference value, copy it into memory we control,
|
|
* and decrease availMem. Then call the code shared with the tuple case.
|
|
*/
|
|
if (isNull || state->datumTypeByVal)
|
|
{
|
|
stup.datum1 = val;
|
|
stup.isnull1 = isNull;
|
|
stup.tuple = NULL; /* no separate storage */
|
|
}
|
|
else
|
|
{
|
|
stup.datum1 = datumCopy(val, false, state->datumTypeLen);
|
|
stup.isnull1 = false;
|
|
stup.tuple = DatumGetPointer(stup.datum1);
|
|
USEMEM(state, GetMemoryChunkSpace(stup.tuple));
|
|
}
|
|
|
|
puttuple_common(state, &stup);
|
|
|
|
MemoryContextSwitchTo(oldcontext);
|
|
}
|
|
|
|
/*
|
|
* Shared code for tuple and datum cases.
|
|
*/
|
|
static void
|
|
puttuple_common(Tuplesortstate *state, SortTuple *tuple)
|
|
{
|
|
switch (state->status)
|
|
{
|
|
case TSS_INITIAL:
|
|
|
|
/*
|
|
* Save the tuple into the unsorted array. First, grow the
|
|
* array as needed. Note that we try to grow the array when there
|
|
* is still one free slot remaining --- if we fail, there'll still
|
|
* be room to store the incoming tuple, and then we'll switch to
|
|
* tape-based operation.
|
|
*/
|
|
if (state->memtupcount >= state->memtupsize - 1)
|
|
{
|
|
(void) grow_memtuples(state);
|
|
Assert(state->memtupcount < state->memtupsize);
|
|
}
|
|
state->memtuples[state->memtupcount++] = *tuple;
|
|
|
|
/*
|
|
* Done if we still fit in available memory and have array slots.
|
|
*/
|
|
if (state->memtupcount < state->memtupsize && !LACKMEM(state))
|
|
return;
|
|
|
|
/*
|
|
* Nope; time to switch to tape-based operation.
|
|
*/
|
|
inittapes(state);
|
|
|
|
/*
|
|
* Dump tuples until we are back under the limit.
|
|
*/
|
|
dumptuples(state, false);
|
|
break;
|
|
case TSS_BUILDRUNS:
|
|
|
|
/*
|
|
* Insert the tuple into the heap, with run number
|
|
* currentRun if it can go into the current run, else run number
|
|
* currentRun+1. The tuple can go into the current run if it is
|
|
* >= the first not-yet-output tuple. (Actually, it could go into
|
|
* the current run if it is >= the most recently output tuple ...
|
|
* but that would require keeping around the tuple we last output,
|
|
* and it's simplest to let writetup free each tuple as soon as
|
|
* it's written.)
|
|
*
|
|
* Note there will always be at least one tuple in the heap at
|
|
* this point; see dumptuples.
|
|
*/
|
|
Assert(state->memtupcount > 0);
|
|
if (COMPARETUP(state, tuple, &state->memtuples[0]) >= 0)
|
|
tuplesort_heap_insert(state, tuple, state->currentRun, true);
|
|
else
|
|
tuplesort_heap_insert(state, tuple, state->currentRun + 1, true);
|
|
|
|
/*
|
|
* If we are over the memory limit, dump tuples till we're under.
|
|
*/
|
|
dumptuples(state, false);
|
|
break;
|
|
default:
|
|
elog(ERROR, "invalid tuplesort state");
|
|
break;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* All tuples have been provided; finish the sort.
|
|
*/
|
|
void
|
|
tuplesort_performsort(Tuplesortstate *state)
|
|
{
|
|
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
|
|
|
|
#ifdef TRACE_SORT
|
|
if (trace_sort)
|
|
elog(LOG, "performsort starting: %s",
|
|
pg_rusage_show(&state->ru_start));
|
|
#endif
|
|
|
|
switch (state->status)
|
|
{
|
|
case TSS_INITIAL:
|
|
|
|
/*
|
|
* We were able to accumulate all the tuples within the allowed
|
|
* amount of memory. Just qsort 'em and we're done.
|
|
*/
|
|
if (state->memtupcount > 1)
|
|
{
|
|
qsort_tuplesortstate = state;
|
|
qsort((void *) state->memtuples, state->memtupcount,
|
|
sizeof(SortTuple), qsort_comparetup);
|
|
}
|
|
state->current = 0;
|
|
state->eof_reached = false;
|
|
state->markpos_offset = 0;
|
|
state->markpos_eof = false;
|
|
state->status = TSS_SORTEDINMEM;
|
|
break;
|
|
case TSS_BUILDRUNS:
|
|
|
|
/*
|
|
* Finish tape-based sort. First, flush all tuples remaining in
|
|
* memory out to tape; then merge until we have a single remaining
|
|
* run (or, if !randomAccess, one run per tape). Note that
|
|
* mergeruns sets the correct state->status.
|
|
*/
|
|
dumptuples(state, true);
|
|
mergeruns(state);
|
|
state->eof_reached = false;
|
|
state->markpos_block = 0L;
|
|
state->markpos_offset = 0;
|
|
state->markpos_eof = false;
|
|
break;
|
|
default:
|
|
elog(ERROR, "invalid tuplesort state");
|
|
break;
|
|
}
|
|
|
|
#ifdef TRACE_SORT
|
|
if (trace_sort)
|
|
{
|
|
if (state->status == TSS_FINALMERGE)
|
|
elog(LOG, "performsort done (except %d-way final merge): %s",
|
|
state->activeTapes,
|
|
pg_rusage_show(&state->ru_start));
|
|
else
|
|
elog(LOG, "performsort done: %s",
|
|
pg_rusage_show(&state->ru_start));
|
|
}
|
|
#endif
|
|
|
|
MemoryContextSwitchTo(oldcontext);
|
|
}
|
|
|
|
/*
|
|
* Internal routine to fetch the next tuple in either forward or back
|
|
* direction into *stup. Returns FALSE if no more tuples.
|
|
* If *should_free is set, the caller must pfree stup.tuple when done with it.
|
|
*/
|
|
static bool
|
|
tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
|
|
SortTuple *stup, bool *should_free)
|
|
{
|
|
unsigned int tuplen;
|
|
|
|
switch (state->status)
|
|
{
|
|
case TSS_SORTEDINMEM:
|
|
Assert(forward || state->randomAccess);
|
|
*should_free = false;
|
|
if (forward)
|
|
{
|
|
if (state->current < state->memtupcount)
|
|
{
|
|
*stup = state->memtuples[state->current++];
|
|
return true;
|
|
}
|
|
state->eof_reached = true;
|
|
return false;
|
|
}
|
|
else
|
|
{
|
|
if (state->current <= 0)
|
|
return false;
|
|
|
|
/*
|
|
* if all tuples are fetched already then we return last
|
|
* tuple, else - tuple before last returned.
|
|
*/
|
|
if (state->eof_reached)
|
|
state->eof_reached = false;
|
|
else
|
|
{
|
|
state->current--; /* last returned tuple */
|
|
if (state->current <= 0)
|
|
return false;
|
|
}
|
|
*stup = state->memtuples[state->current - 1];
|
|
return true;
|
|
}
|
|
break;
|
|
|
|
case TSS_SORTEDONTAPE:
|
|
Assert(forward || state->randomAccess);
|
|
*should_free = true;
|
|
if (forward)
|
|
{
|
|
if (state->eof_reached)
|
|
return false;
|
|
if ((tuplen = getlen(state, state->result_tape, true)) != 0)
|
|
{
|
|
READTUP(state, stup, state->result_tape, tuplen);
|
|
return true;
|
|
}
|
|
else
|
|
{
|
|
state->eof_reached = true;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Backward.
|
|
*
|
|
* if all tuples are fetched already then we return last tuple,
|
|
* else - tuple before last returned.
|
|
*/
|
|
if (state->eof_reached)
|
|
{
|
|
/*
|
|
* Seek position is pointing just past the zero tuplen at the
|
|
* end of file; back up to fetch last tuple's ending length
|
|
* word. If seek fails we must have a completely empty file.
|
|
*/
|
|
if (!LogicalTapeBackspace(state->tapeset,
|
|
state->result_tape,
|
|
2 * sizeof(unsigned int)))
|
|
return false;
|
|
state->eof_reached = false;
|
|
}
|
|
else
|
|
{
|
|
/*
|
|
* Back up and fetch previously-returned tuple's ending length
|
|
* word. If seek fails, assume we are at start of file.
|
|
*/
|
|
if (!LogicalTapeBackspace(state->tapeset,
|
|
state->result_tape,
|
|
sizeof(unsigned int)))
|
|
return false;
|
|
tuplen = getlen(state, state->result_tape, false);
|
|
|
|
/*
|
|
* Back up to get ending length word of tuple before it.
|
|
*/
|
|
if (!LogicalTapeBackspace(state->tapeset,
|
|
state->result_tape,
|
|
tuplen + 2 * sizeof(unsigned int)))
|
|
{
|
|
/*
|
|
* If that fails, presumably the prev tuple is the first
|
|
* in the file. Back up so that it becomes next to read
|
|
* in forward direction (not obviously right, but that is
|
|
* what in-memory case does).
|
|
*/
|
|
if (!LogicalTapeBackspace(state->tapeset,
|
|
state->result_tape,
|
|
tuplen + sizeof(unsigned int)))
|
|
elog(ERROR, "bogus tuple length in backward scan");
|
|
return false;
|
|
}
|
|
}
|
|
|
|
tuplen = getlen(state, state->result_tape, false);
|
|
|
|
/*
|
|
* Now we have the length of the prior tuple, back up and read it.
|
|
* Note: READTUP expects we are positioned after the initial
|
|
* length word of the tuple, so back up to that point.
|
|
*/
|
|
if (!LogicalTapeBackspace(state->tapeset,
|
|
state->result_tape,
|
|
tuplen))
|
|
elog(ERROR, "bogus tuple length in backward scan");
|
|
READTUP(state, stup, state->result_tape, tuplen);
|
|
return true;
|
|
|
|
case TSS_FINALMERGE:
|
|
Assert(forward);
|
|
*should_free = true;
|
|
|
|
/*
|
|
* This code should match the inner loop of mergeonerun().
|
|
*/
|
|
if (state->memtupcount > 0)
|
|
{
|
|
int srcTape = state->memtuples[0].tupindex;
|
|
Size tuplen;
|
|
int tupIndex;
|
|
SortTuple *newtup;
|
|
|
|
*stup = state->memtuples[0];
|
|
/* returned tuple is no longer counted in our memory space */
|
|
if (stup->tuple)
|
|
{
|
|
tuplen = GetMemoryChunkSpace(stup->tuple);
|
|
state->availMem += tuplen;
|
|
state->mergeavailmem[srcTape] += tuplen;
|
|
}
|
|
tuplesort_heap_siftup(state, false);
|
|
if ((tupIndex = state->mergenext[srcTape]) == 0)
|
|
{
|
|
/*
|
|
* out of preloaded data on this tape, try to read more
|
|
*/
|
|
mergepreread(state);
|
|
|
|
/*
|
|
* if still no data, we've reached end of run on this tape
|
|
*/
|
|
if ((tupIndex = state->mergenext[srcTape]) == 0)
|
|
return true;
|
|
}
|
|
/* pull next preread tuple from list, insert in heap */
|
|
newtup = &state->memtuples[tupIndex];
|
|
state->mergenext[srcTape] = newtup->tupindex;
|
|
if (state->mergenext[srcTape] == 0)
|
|
state->mergelast[srcTape] = 0;
|
|
tuplesort_heap_insert(state, newtup, srcTape, false);
|
|
/* put the now-unused memtuples entry on the freelist */
|
|
newtup->tupindex = state->mergefreelist;
|
|
state->mergefreelist = tupIndex;
|
|
state->mergeslotsfree++;
|
|
return true;
|
|
}
|
|
return false;
|
|
|
|
default:
|
|
elog(ERROR, "invalid tuplesort state");
|
|
return false; /* keep compiler quiet */
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Fetch the next tuple in either forward or back direction.
|
|
* Returns NULL if no more tuples. If *should_free is set, the
|
|
* caller must pfree the returned tuple when done with it.
|
|
*/
|
|
void *
|
|
tuplesort_gettuple(Tuplesortstate *state, bool forward,
|
|
bool *should_free)
|
|
{
|
|
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
|
|
SortTuple stup;
|
|
|
|
if (!tuplesort_gettuple_common(state, forward, &stup, should_free))
|
|
stup.tuple = NULL;
|
|
|
|
MemoryContextSwitchTo(oldcontext);
|
|
|
|
return stup.tuple;
|
|
}
|
|
|
|
/*
|
|
* Fetch the next Datum in either forward or back direction.
|
|
* Returns FALSE if no more datums.
|
|
*
|
|
* If the Datum is pass-by-ref type, the returned value is freshly palloc'd
|
|
* and is now owned by the caller.
|
|
*/
|
|
bool
|
|
tuplesort_getdatum(Tuplesortstate *state, bool forward,
|
|
Datum *val, bool *isNull)
|
|
{
|
|
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
|
|
SortTuple stup;
|
|
bool should_free;
|
|
|
|
if (!tuplesort_gettuple_common(state, forward, &stup, &should_free))
|
|
{
|
|
MemoryContextSwitchTo(oldcontext);
|
|
return false;
|
|
}
|
|
|
|
if (stup.isnull1 || state->datumTypeByVal)
|
|
{
|
|
*val = stup.datum1;
|
|
*isNull = stup.isnull1;
|
|
}
|
|
else
|
|
{
|
|
if (should_free)
|
|
*val = stup.datum1;
|
|
else
|
|
*val = datumCopy(stup.datum1, false, state->datumTypeLen);
|
|
*isNull = false;
|
|
}
|
|
|
|
MemoryContextSwitchTo(oldcontext);
|
|
|
|
return true;
|
|
}
|
|
|
|
/*
|
|
* tuplesort_merge_order - report merge order we'll use for given memory
|
|
* (note: "merge order" just means the number of input tapes in the merge).
|
|
*
|
|
* This is exported for use by the planner. allowedMem is in bytes.
|
|
*/
|
|
int
|
|
tuplesort_merge_order(long allowedMem)
|
|
{
|
|
int mOrder;
|
|
|
|
/*
|
|
* We need one tape for each merge input, plus another one for the
|
|
* output, and each of these tapes needs buffer space. In addition
|
|
* we want MERGE_BUFFER_SIZE workspace per input tape (but the output
|
|
* tape doesn't count).
|
|
*
|
|
* Note: you might be thinking we need to account for the memtuples[]
|
|
* array in this calculation, but we effectively treat that as part of
|
|
* the MERGE_BUFFER_SIZE workspace.
|
|
*/
|
|
mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
|
|
(MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
|
|
|
|
/* Even in minimum memory, use at least a MINORDER merge */
|
|
mOrder = Max(mOrder, MINORDER);
|
|
|
|
return mOrder;
|
|
}
|
|
|
|
/*
|
|
* inittapes - initialize for tape sorting.
|
|
*
|
|
* This is called only if we have found we don't have room to sort in memory.
|
|
*/
|
|
static void
|
|
inittapes(Tuplesortstate *state)
|
|
{
|
|
int maxTapes,
|
|
ntuples,
|
|
j;
|
|
long tapeSpace;
|
|
|
|
/* Compute number of tapes to use: merge order plus 1 */
|
|
maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
|
|
|
|
/*
|
|
* We must have at least 2*maxTapes slots in the memtuples[] array, else
|
|
* we'd not have room for merge heap plus preread. It seems unlikely
|
|
* that this case would ever occur, but be safe.
|
|
*/
|
|
maxTapes = Min(maxTapes, state->memtupsize / 2);
|
|
|
|
state->maxTapes = maxTapes;
|
|
state->tapeRange = maxTapes - 1;
|
|
|
|
#ifdef TRACE_SORT
|
|
if (trace_sort)
|
|
elog(LOG, "switching to external sort with %d tapes: %s",
|
|
maxTapes, pg_rusage_show(&state->ru_start));
|
|
#endif
|
|
|
|
/*
|
|
* Decrease availMem to reflect the space needed for tape buffers; but
|
|
* don't decrease it to the point that we have no room for tuples.
|
|
* (That case is only likely to occur if sorting pass-by-value Datums;
|
|
* in all other scenarios the memtuples[] array is unlikely to occupy
|
|
* more than half of allowedMem. In the pass-by-value case it's not
|
|
* important to account for tuple space, so we don't care if LACKMEM
|
|
* becomes inaccurate.)
|
|
*/
|
|
tapeSpace = maxTapes * TAPE_BUFFER_OVERHEAD;
|
|
if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
|
|
USEMEM(state, tapeSpace);
|
|
|
|
/*
|
|
* Create the tape set and allocate the per-tape data arrays.
|
|
*/
|
|
state->tapeset = LogicalTapeSetCreate(maxTapes);
|
|
|
|
state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
|
|
state->mergenext = (int *) palloc0(maxTapes * sizeof(int));
|
|
state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
|
|
state->mergeavailmem = (long *) palloc0(maxTapes * sizeof(long));
|
|
state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
|
|
state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
|
|
state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
|
|
state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
|
|
|
|
/*
|
|
* Convert the unsorted contents of memtuples[] into a heap. Each tuple is
|
|
* marked as belonging to run number zero.
|
|
*
|
|
* NOTE: we pass false for checkIndex since there's no point in comparing
|
|
* indexes in this step, even though we do intend the indexes to be part
|
|
* of the sort key...
|
|
*/
|
|
ntuples = state->memtupcount;
|
|
state->memtupcount = 0; /* make the heap empty */
|
|
for (j = 0; j < ntuples; j++)
|
|
{
|
|
/* Must copy source tuple to avoid possible overwrite */
|
|
SortTuple stup = state->memtuples[j];
|
|
|
|
tuplesort_heap_insert(state, &stup, 0, false);
|
|
}
|
|
Assert(state->memtupcount == ntuples);
|
|
|
|
state->currentRun = 0;
|
|
|
|
/*
|
|
* Initialize variables of Algorithm D (step D1).
|
|
*/
|
|
for (j = 0; j < maxTapes; j++)
|
|
{
|
|
state->tp_fib[j] = 1;
|
|
state->tp_runs[j] = 0;
|
|
state->tp_dummy[j] = 1;
|
|
state->tp_tapenum[j] = j;
|
|
}
|
|
state->tp_fib[state->tapeRange] = 0;
|
|
state->tp_dummy[state->tapeRange] = 0;
|
|
|
|
state->Level = 1;
|
|
state->destTape = 0;
|
|
|
|
state->status = TSS_BUILDRUNS;
|
|
}
|
|
|
|
/*
|
|
* selectnewtape -- select new tape for new initial run.
|
|
*
|
|
* This is called after finishing a run when we know another run
|
|
* must be started. This implements steps D3, D4 of Algorithm D.
|
|
*/
|
|
static void
|
|
selectnewtape(Tuplesortstate *state)
|
|
{
|
|
int j;
|
|
int a;
|
|
|
|
/* Step D3: advance j (destTape) */
|
|
if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
|
|
{
|
|
state->destTape++;
|
|
return;
|
|
}
|
|
if (state->tp_dummy[state->destTape] != 0)
|
|
{
|
|
state->destTape = 0;
|
|
return;
|
|
}
|
|
|
|
/* Step D4: increase level */
|
|
state->Level++;
|
|
a = state->tp_fib[0];
|
|
for (j = 0; j < state->tapeRange; j++)
|
|
{
|
|
state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
|
|
state->tp_fib[j] = a + state->tp_fib[j + 1];
|
|
}
|
|
state->destTape = 0;
|
|
}
|
|
|
|
/*
|
|
* mergeruns -- merge all the completed initial runs.
|
|
*
|
|
* This implements steps D5, D6 of Algorithm D. All input data has
|
|
* already been written to initial runs on tape (see dumptuples).
|
|
*/
|
|
static void
|
|
mergeruns(Tuplesortstate *state)
|
|
{
|
|
int tapenum,
|
|
svTape,
|
|
svRuns,
|
|
svDummy;
|
|
|
|
Assert(state->status == TSS_BUILDRUNS);
|
|
Assert(state->memtupcount == 0);
|
|
|
|
/*
|
|
* If we produced only one initial run (quite likely if the total data
|
|
* volume is between 1X and 2X workMem), we can just use that tape as the
|
|
* finished output, rather than doing a useless merge. (This obvious
|
|
* optimization is not in Knuth's algorithm.)
|
|
*/
|
|
if (state->currentRun == 1)
|
|
{
|
|
state->result_tape = state->tp_tapenum[state->destTape];
|
|
/* must freeze and rewind the finished output tape */
|
|
LogicalTapeFreeze(state->tapeset, state->result_tape);
|
|
state->status = TSS_SORTEDONTAPE;
|
|
return;
|
|
}
|
|
|
|
/* End of step D2: rewind all output tapes to prepare for merging */
|
|
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
|
|
LogicalTapeRewind(state->tapeset, tapenum, false);
|
|
|
|
for (;;)
|
|
{
|
|
/*
|
|
* At this point we know that tape[T] is empty. If there's just one
|
|
* (real or dummy) run left on each input tape, then only one merge
|
|
* pass remains. If we don't have to produce a materialized sorted
|
|
* tape, we can stop at this point and do the final merge on-the-fly.
|
|
*/
|
|
if (!state->randomAccess)
|
|
{
|
|
bool allOneRun = true;
|
|
|
|
Assert(state->tp_runs[state->tapeRange] == 0);
|
|
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
|
|
{
|
|
if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
|
|
{
|
|
allOneRun = false;
|
|
break;
|
|
}
|
|
}
|
|
if (allOneRun)
|
|
{
|
|
/* Tell logtape.c we won't be writing anymore */
|
|
LogicalTapeSetForgetFreeSpace(state->tapeset);
|
|
/* Initialize for the final merge pass */
|
|
beginmerge(state);
|
|
state->status = TSS_FINALMERGE;
|
|
return;
|
|
}
|
|
}
|
|
|
|
/* Step D5: merge runs onto tape[T] until tape[P] is empty */
|
|
while (state->tp_runs[state->tapeRange - 1] ||
|
|
state->tp_dummy[state->tapeRange - 1])
|
|
{
|
|
bool allDummy = true;
|
|
|
|
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
|
|
{
|
|
if (state->tp_dummy[tapenum] == 0)
|
|
{
|
|
allDummy = false;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (allDummy)
|
|
{
|
|
state->tp_dummy[state->tapeRange]++;
|
|
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
|
|
state->tp_dummy[tapenum]--;
|
|
}
|
|
else
|
|
mergeonerun(state);
|
|
}
|
|
|
|
/* Step D6: decrease level */
|
|
if (--state->Level == 0)
|
|
break;
|
|
/* rewind output tape T to use as new input */
|
|
LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange],
|
|
false);
|
|
/* rewind used-up input tape P, and prepare it for write pass */
|
|
LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange - 1],
|
|
true);
|
|
state->tp_runs[state->tapeRange - 1] = 0;
|
|
|
|
/*
|
|
* reassign tape units per step D6; note we no longer care about A[]
|
|
*/
|
|
svTape = state->tp_tapenum[state->tapeRange];
|
|
svDummy = state->tp_dummy[state->tapeRange];
|
|
svRuns = state->tp_runs[state->tapeRange];
|
|
for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
|
|
{
|
|
state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
|
|
state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
|
|
state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
|
|
}
|
|
state->tp_tapenum[0] = svTape;
|
|
state->tp_dummy[0] = svDummy;
|
|
state->tp_runs[0] = svRuns;
|
|
}
|
|
|
|
/*
|
|
* Done. Knuth says that the result is on TAPE[1], but since we exited
|
|
* the loop without performing the last iteration of step D6, we have not
|
|
* rearranged the tape unit assignment, and therefore the result is on
|
|
* TAPE[T]. We need to do it this way so that we can freeze the final
|
|
* output tape while rewinding it. The last iteration of step D6 would be
|
|
* a waste of cycles anyway...
|
|
*/
|
|
state->result_tape = state->tp_tapenum[state->tapeRange];
|
|
LogicalTapeFreeze(state->tapeset, state->result_tape);
|
|
state->status = TSS_SORTEDONTAPE;
|
|
}
|
|
|
|
/*
|
|
* Merge one run from each input tape, except ones with dummy runs.
|
|
*
|
|
* This is the inner loop of Algorithm D step D5. We know that the
|
|
* output tape is TAPE[T].
|
|
*/
|
|
static void
|
|
mergeonerun(Tuplesortstate *state)
|
|
{
|
|
int destTape = state->tp_tapenum[state->tapeRange];
|
|
int srcTape;
|
|
int tupIndex;
|
|
SortTuple *tup;
|
|
long priorAvail,
|
|
spaceFreed;
|
|
|
|
/*
|
|
* Start the merge by loading one tuple from each active source tape into
|
|
* the heap. We can also decrease the input run/dummy run counts.
|
|
*/
|
|
beginmerge(state);
|
|
|
|
/*
|
|
* Execute merge by repeatedly extracting lowest tuple in heap, writing it
|
|
* out, and replacing it with next tuple from same tape (if there is
|
|
* another one).
|
|
*/
|
|
while (state->memtupcount > 0)
|
|
{
|
|
CHECK_FOR_INTERRUPTS();
|
|
/* write the tuple to destTape */
|
|
priorAvail = state->availMem;
|
|
srcTape = state->memtuples[0].tupindex;
|
|
WRITETUP(state, destTape, &state->memtuples[0]);
|
|
/* writetup adjusted total free space, now fix per-tape space */
|
|
spaceFreed = state->availMem - priorAvail;
|
|
state->mergeavailmem[srcTape] += spaceFreed;
|
|
/* compact the heap */
|
|
tuplesort_heap_siftup(state, false);
|
|
if ((tupIndex = state->mergenext[srcTape]) == 0)
|
|
{
|
|
/* out of preloaded data on this tape, try to read more */
|
|
mergepreread(state);
|
|
/* if still no data, we've reached end of run on this tape */
|
|
if ((tupIndex = state->mergenext[srcTape]) == 0)
|
|
continue;
|
|
}
|
|
/* pull next preread tuple from list, insert in heap */
|
|
tup = &state->memtuples[tupIndex];
|
|
state->mergenext[srcTape] = tup->tupindex;
|
|
if (state->mergenext[srcTape] == 0)
|
|
state->mergelast[srcTape] = 0;
|
|
tuplesort_heap_insert(state, tup, srcTape, false);
|
|
/* put the now-unused memtuples entry on the freelist */
|
|
tup->tupindex = state->mergefreelist;
|
|
state->mergefreelist = tupIndex;
|
|
state->mergeslotsfree++;
|
|
}
|
|
|
|
/*
|
|
* When the heap empties, we're done. Write an end-of-run marker on the
|
|
* output tape, and increment its count of real runs.
|
|
*/
|
|
markrunend(state, destTape);
|
|
state->tp_runs[state->tapeRange]++;
|
|
|
|
#ifdef TRACE_SORT
|
|
if (trace_sort)
|
|
elog(LOG, "finished %d-way merge step: %s", state->activeTapes,
|
|
pg_rusage_show(&state->ru_start));
|
|
#endif
|
|
}
|
|
|
|
/*
|
|
* beginmerge - initialize for a merge pass
|
|
*
|
|
* We decrease the counts of real and dummy runs for each tape, and mark
|
|
* which tapes contain active input runs in mergeactive[]. Then, load
|
|
* as many tuples as we can from each active input tape, and finally
|
|
* fill the merge heap with the first tuple from each active tape.
|
|
*/
|
|
static void
|
|
beginmerge(Tuplesortstate *state)
|
|
{
|
|
int activeTapes;
|
|
int tapenum;
|
|
int srcTape;
|
|
|
|
/* Heap should be empty here */
|
|
Assert(state->memtupcount == 0);
|
|
|
|
/* Clear merge-pass state variables */
|
|
memset(state->mergeactive, 0, state->maxTapes * sizeof(*state->mergeactive));
|
|
memset(state->mergenext, 0, state->maxTapes * sizeof(*state->mergenext));
|
|
memset(state->mergelast, 0, state->maxTapes * sizeof(*state->mergelast));
|
|
memset(state->mergeavailmem, 0, state->maxTapes * sizeof(*state->mergeavailmem));
|
|
state->mergefreelist = 0; /* nothing in the freelist */
|
|
state->mergefirstfree = state->maxTapes; /* 1st slot avail for preread */
|
|
state->mergeslotsfree = state->memtupsize - state->mergefirstfree;
|
|
Assert(state->mergeslotsfree >= state->maxTapes);
|
|
|
|
/* Adjust run counts and mark the active tapes */
|
|
activeTapes = 0;
|
|
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
|
|
{
|
|
if (state->tp_dummy[tapenum] > 0)
|
|
state->tp_dummy[tapenum]--;
|
|
else
|
|
{
|
|
Assert(state->tp_runs[tapenum] > 0);
|
|
state->tp_runs[tapenum]--;
|
|
srcTape = state->tp_tapenum[tapenum];
|
|
state->mergeactive[srcTape] = true;
|
|
activeTapes++;
|
|
}
|
|
}
|
|
state->activeTapes = activeTapes;
|
|
|
|
/*
|
|
* Initialize space allocation to let each active input tape have an equal
|
|
* share of preread space.
|
|
*/
|
|
Assert(activeTapes > 0);
|
|
state->spacePerTape = state->availMem / activeTapes;
|
|
for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
|
|
{
|
|
if (state->mergeactive[srcTape])
|
|
state->mergeavailmem[srcTape] = state->spacePerTape;
|
|
}
|
|
|
|
/*
|
|
* Preread as many tuples as possible (and at least one) from each active
|
|
* tape
|
|
*/
|
|
mergepreread(state);
|
|
|
|
/* Load the merge heap with the first tuple from each input tape */
|
|
for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
|
|
{
|
|
int tupIndex = state->mergenext[srcTape];
|
|
SortTuple *tup;
|
|
|
|
if (tupIndex)
|
|
{
|
|
tup = &state->memtuples[tupIndex];
|
|
state->mergenext[srcTape] = tup->tupindex;
|
|
if (state->mergenext[srcTape] == 0)
|
|
state->mergelast[srcTape] = 0;
|
|
tuplesort_heap_insert(state, tup, srcTape, false);
|
|
/* put the now-unused memtuples entry on the freelist */
|
|
tup->tupindex = state->mergefreelist;
|
|
state->mergefreelist = tupIndex;
|
|
state->mergeslotsfree++;
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
* mergepreread - load tuples from merge input tapes
|
|
*
|
|
* This routine exists to improve sequentiality of reads during a merge pass,
|
|
* as explained in the header comments of this file. Load tuples from each
|
|
* active source tape until the tape's run is exhausted or it has used up
|
|
* its fair share of available memory. In any case, we guarantee that there
|
|
* is at least one preread tuple available from each unexhausted input tape.
|
|
*/
|
|
static void
|
|
mergepreread(Tuplesortstate *state)
|
|
{
|
|
int srcTape;
|
|
unsigned int tuplen;
|
|
SortTuple stup;
|
|
int tupIndex;
|
|
long priorAvail,
|
|
spaceUsed;
|
|
|
|
for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
|
|
{
|
|
if (!state->mergeactive[srcTape])
|
|
continue;
|
|
|
|
/*
|
|
* Skip reading from any tape that still has at least half of its
|
|
* target memory filled with tuples (threshold fraction may need
|
|
* adjustment?). This avoids reading just a few tuples when the
|
|
* incoming runs are not being consumed evenly.
|
|
*/
|
|
if (state->mergenext[srcTape] != 0 &&
|
|
state->mergeavailmem[srcTape] <= state->spacePerTape / 2)
|
|
continue;
|
|
|
|
/*
|
|
* Read tuples from this tape until it has used up its free memory,
|
|
* or we are low on memtuples slots; but ensure that we have at least
|
|
* one tuple.
|
|
*/
|
|
priorAvail = state->availMem;
|
|
state->availMem = state->mergeavailmem[srcTape];
|
|
while ((!LACKMEM(state) && state->mergeslotsfree > state->tapeRange) ||
|
|
state->mergenext[srcTape] == 0)
|
|
{
|
|
/* read next tuple, if any */
|
|
if ((tuplen = getlen(state, srcTape, true)) == 0)
|
|
{
|
|
state->mergeactive[srcTape] = false;
|
|
break;
|
|
}
|
|
READTUP(state, &stup, srcTape, tuplen);
|
|
/* find a free slot in memtuples[] for it */
|
|
tupIndex = state->mergefreelist;
|
|
if (tupIndex)
|
|
state->mergefreelist = state->memtuples[tupIndex].tupindex;
|
|
else
|
|
{
|
|
tupIndex = state->mergefirstfree++;
|
|
Assert(tupIndex < state->memtupsize);
|
|
}
|
|
state->mergeslotsfree--;
|
|
/* store tuple, append to list for its tape */
|
|
stup.tupindex = 0;
|
|
state->memtuples[tupIndex] = stup;
|
|
if (state->mergelast[srcTape])
|
|
state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex;
|
|
else
|
|
state->mergenext[srcTape] = tupIndex;
|
|
state->mergelast[srcTape] = tupIndex;
|
|
}
|
|
/* update per-tape and global availmem counts */
|
|
spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
|
|
state->mergeavailmem[srcTape] = state->availMem;
|
|
state->availMem = priorAvail - spaceUsed;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* dumptuples - remove tuples from heap and write to tape
|
|
*
|
|
* This is used during initial-run building, but not during merging.
|
|
*
|
|
* When alltuples = false, dump only enough tuples to get under the
|
|
* availMem limit (and leave at least one tuple in the heap in any case,
|
|
* since puttuple assumes it always has a tuple to compare to). We also
|
|
* insist there be at least one free slot in the memtuples[] array.
|
|
*
|
|
* When alltuples = true, dump everything currently in memory.
|
|
* (This case is only used at end of input data.)
|
|
*
|
|
* If we empty the heap, close out the current run and return (this should
|
|
* only happen at end of input data). If we see that the tuple run number
|
|
* at the top of the heap has changed, start a new run.
|
|
*/
|
|
static void
|
|
dumptuples(Tuplesortstate *state, bool alltuples)
|
|
{
|
|
while (alltuples ||
|
|
(LACKMEM(state) && state->memtupcount > 1) ||
|
|
state->memtupcount >= state->memtupsize)
|
|
{
|
|
/*
|
|
* Dump the heap's frontmost entry, and sift up to remove it from the
|
|
* heap.
|
|
*/
|
|
Assert(state->memtupcount > 0);
|
|
WRITETUP(state, state->tp_tapenum[state->destTape],
|
|
&state->memtuples[0]);
|
|
tuplesort_heap_siftup(state, true);
|
|
|
|
/*
|
|
* If the heap is empty *or* top run number has changed, we've
|
|
* finished the current run.
|
|
*/
|
|
if (state->memtupcount == 0 ||
|
|
state->currentRun != state->memtuples[0].tupindex)
|
|
{
|
|
markrunend(state, state->tp_tapenum[state->destTape]);
|
|
state->currentRun++;
|
|
state->tp_runs[state->destTape]++;
|
|
state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
|
|
|
|
#ifdef TRACE_SORT
|
|
if (trace_sort)
|
|
elog(LOG, "finished writing%s run %d to tape %d: %s",
|
|
(state->memtupcount == 0) ? " final" : "",
|
|
state->currentRun, state->destTape,
|
|
pg_rusage_show(&state->ru_start));
|
|
#endif
|
|
|
|
/*
|
|
* Done if heap is empty, else prepare for new run.
|
|
*/
|
|
if (state->memtupcount == 0)
|
|
break;
|
|
Assert(state->currentRun == state->memtuples[0].tupindex);
|
|
selectnewtape(state);
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
* tuplesort_rescan - rewind and replay the scan
|
|
*/
|
|
void
|
|
tuplesort_rescan(Tuplesortstate *state)
|
|
{
|
|
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
|
|
|
|
Assert(state->randomAccess);
|
|
|
|
switch (state->status)
|
|
{
|
|
case TSS_SORTEDINMEM:
|
|
state->current = 0;
|
|
state->eof_reached = false;
|
|
state->markpos_offset = 0;
|
|
state->markpos_eof = false;
|
|
break;
|
|
case TSS_SORTEDONTAPE:
|
|
LogicalTapeRewind(state->tapeset,
|
|
state->result_tape,
|
|
false);
|
|
state->eof_reached = false;
|
|
state->markpos_block = 0L;
|
|
state->markpos_offset = 0;
|
|
state->markpos_eof = false;
|
|
break;
|
|
default:
|
|
elog(ERROR, "invalid tuplesort state");
|
|
break;
|
|
}
|
|
|
|
MemoryContextSwitchTo(oldcontext);
|
|
}
|
|
|
|
/*
|
|
* tuplesort_markpos - saves current position in the merged sort file
|
|
*/
|
|
void
|
|
tuplesort_markpos(Tuplesortstate *state)
|
|
{
|
|
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
|
|
|
|
Assert(state->randomAccess);
|
|
|
|
switch (state->status)
|
|
{
|
|
case TSS_SORTEDINMEM:
|
|
state->markpos_offset = state->current;
|
|
state->markpos_eof = state->eof_reached;
|
|
break;
|
|
case TSS_SORTEDONTAPE:
|
|
LogicalTapeTell(state->tapeset,
|
|
state->result_tape,
|
|
&state->markpos_block,
|
|
&state->markpos_offset);
|
|
state->markpos_eof = state->eof_reached;
|
|
break;
|
|
default:
|
|
elog(ERROR, "invalid tuplesort state");
|
|
break;
|
|
}
|
|
|
|
MemoryContextSwitchTo(oldcontext);
|
|
}
|
|
|
|
/*
|
|
* tuplesort_restorepos - restores current position in merged sort file to
|
|
* last saved position
|
|
*/
|
|
void
|
|
tuplesort_restorepos(Tuplesortstate *state)
|
|
{
|
|
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
|
|
|
|
Assert(state->randomAccess);
|
|
|
|
switch (state->status)
|
|
{
|
|
case TSS_SORTEDINMEM:
|
|
state->current = state->markpos_offset;
|
|
state->eof_reached = state->markpos_eof;
|
|
break;
|
|
case TSS_SORTEDONTAPE:
|
|
if (!LogicalTapeSeek(state->tapeset,
|
|
state->result_tape,
|
|
state->markpos_block,
|
|
state->markpos_offset))
|
|
elog(ERROR, "tuplesort_restorepos failed");
|
|
state->eof_reached = state->markpos_eof;
|
|
break;
|
|
default:
|
|
elog(ERROR, "invalid tuplesort state");
|
|
break;
|
|
}
|
|
|
|
MemoryContextSwitchTo(oldcontext);
|
|
}
|
|
|
|
|
|
/*
|
|
* Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
|
|
*
|
|
* Compare two SortTuples. If checkIndex is true, use the tuple index
|
|
* as the front of the sort key; otherwise, no.
|
|
*/
|
|
|
|
#define HEAPCOMPARE(tup1,tup2) \
|
|
(checkIndex && ((tup1)->tupindex != (tup2)->tupindex) ? \
|
|
((tup1)->tupindex) - ((tup2)->tupindex) : \
|
|
COMPARETUP(state, tup1, tup2))
|
|
|
|
/*
|
|
* Insert a new tuple into an empty or existing heap, maintaining the
|
|
* heap invariant. Caller is responsible for ensuring there's room.
|
|
*
|
|
* Note: we assume *tuple is a temporary variable that can be scribbled on.
|
|
* For some callers, tuple actually points to a memtuples[] entry above the
|
|
* end of the heap. This is safe as long as it's not immediately adjacent
|
|
* to the end of the heap (ie, in the [memtupcount] array entry) --- if it
|
|
* is, it might get overwritten before being moved into the heap!
|
|
*/
|
|
static void
|
|
tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
|
|
int tupleindex, bool checkIndex)
|
|
{
|
|
SortTuple *memtuples;
|
|
int j;
|
|
|
|
/*
|
|
* Save the tupleindex --- see notes above about writing on *tuple.
|
|
* It's a historical artifact that tupleindex is passed as a separate
|
|
* argument and not in *tuple, but it's notationally convenient so
|
|
* let's leave it that way.
|
|
*/
|
|
tuple->tupindex = tupleindex;
|
|
|
|
memtuples = state->memtuples;
|
|
Assert(state->memtupcount < state->memtupsize);
|
|
|
|
/*
|
|
* Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
|
|
* using 1-based array indexes, not 0-based.
|
|
*/
|
|
j = state->memtupcount++;
|
|
while (j > 0)
|
|
{
|
|
int i = (j - 1) >> 1;
|
|
|
|
if (HEAPCOMPARE(tuple, &memtuples[i]) >= 0)
|
|
break;
|
|
memtuples[j] = memtuples[i];
|
|
j = i;
|
|
}
|
|
memtuples[j] = *tuple;
|
|
}
|
|
|
|
/*
|
|
* The tuple at state->memtuples[0] has been removed from the heap.
|
|
* Decrement memtupcount, and sift up to maintain the heap invariant.
|
|
*/
|
|
static void
|
|
tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex)
|
|
{
|
|
SortTuple *memtuples = state->memtuples;
|
|
SortTuple *tuple;
|
|
int i,
|
|
n;
|
|
|
|
if (--state->memtupcount <= 0)
|
|
return;
|
|
n = state->memtupcount;
|
|
tuple = &memtuples[n]; /* tuple that must be reinserted */
|
|
i = 0; /* i is where the "hole" is */
|
|
for (;;)
|
|
{
|
|
int j = 2 * i + 1;
|
|
|
|
if (j >= n)
|
|
break;
|
|
if (j + 1 < n &&
|
|
HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0)
|
|
j++;
|
|
if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0)
|
|
break;
|
|
memtuples[i] = memtuples[j];
|
|
i = j;
|
|
}
|
|
memtuples[i] = *tuple;
|
|
}
|
|
|
|
|
|
/*
|
|
* Tape interface routines
|
|
*/
|
|
|
|
static unsigned int
|
|
getlen(Tuplesortstate *state, int tapenum, bool eofOK)
|
|
{
|
|
unsigned int len;
|
|
|
|
if (LogicalTapeRead(state->tapeset, tapenum, (void *) &len,
|
|
sizeof(len)) != sizeof(len))
|
|
elog(ERROR, "unexpected end of tape");
|
|
if (len == 0 && !eofOK)
|
|
elog(ERROR, "unexpected end of data");
|
|
return len;
|
|
}
|
|
|
|
static void
|
|
markrunend(Tuplesortstate *state, int tapenum)
|
|
{
|
|
unsigned int len = 0;
|
|
|
|
LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
|
|
}
|
|
|
|
|
|
/*
|
|
* qsort interface
|
|
*/
|
|
|
|
static int
|
|
qsort_comparetup(const void *a, const void *b)
|
|
{
|
|
/* The passed pointers are pointers to SortTuple ... */
|
|
return COMPARETUP(qsort_tuplesortstate,
|
|
(const SortTuple *) a,
|
|
(const SortTuple *) b);
|
|
}
|
|
|
|
|
|
/*
|
|
* This routine selects an appropriate sorting function to implement
|
|
* a sort operator as efficiently as possible. The straightforward
|
|
* method is to use the operator's implementation proc --- ie, "<"
|
|
* comparison. However, that way often requires two calls of the function
|
|
* per comparison. If we can find a btree three-way comparator function
|
|
* associated with the operator, we can use it to do the comparisons
|
|
* more efficiently. We also support the possibility that the operator
|
|
* is ">" (descending sort), in which case we have to reverse the output
|
|
* of the btree comparator.
|
|
*
|
|
* Possibly this should live somewhere else (backend/catalog/, maybe?).
|
|
*/
|
|
void
|
|
SelectSortFunction(Oid sortOperator,
|
|
RegProcedure *sortFunction,
|
|
SortFunctionKind *kind)
|
|
{
|
|
CatCList *catlist;
|
|
int i;
|
|
HeapTuple tuple;
|
|
Form_pg_operator optup;
|
|
Oid opclass = InvalidOid;
|
|
|
|
/*
|
|
* Search pg_amop to see if the target operator is registered as the "<"
|
|
* or ">" operator of any btree opclass. It's possible that it might be
|
|
* registered both ways (eg, if someone were to build a "reverse sort"
|
|
* opclass for some reason); prefer the "<" case if so. If the operator is
|
|
* registered the same way in multiple opclasses, assume we can use the
|
|
* associated comparator function from any one.
|
|
*/
|
|
catlist = SearchSysCacheList(AMOPOPID, 1,
|
|
ObjectIdGetDatum(sortOperator),
|
|
0, 0, 0);
|
|
|
|
for (i = 0; i < catlist->n_members; i++)
|
|
{
|
|
Form_pg_amop aform;
|
|
|
|
tuple = &catlist->members[i]->tuple;
|
|
aform = (Form_pg_amop) GETSTRUCT(tuple);
|
|
|
|
if (!opclass_is_btree(aform->amopclaid))
|
|
continue;
|
|
/* must be of default subtype, too */
|
|
if (aform->amopsubtype != InvalidOid)
|
|
continue;
|
|
|
|
if (aform->amopstrategy == BTLessStrategyNumber)
|
|
{
|
|
opclass = aform->amopclaid;
|
|
*kind = SORTFUNC_CMP;
|
|
break; /* done looking */
|
|
}
|
|
else if (aform->amopstrategy == BTGreaterStrategyNumber)
|
|
{
|
|
opclass = aform->amopclaid;
|
|
*kind = SORTFUNC_REVCMP;
|
|
/* keep scanning in hopes of finding a BTLess entry */
|
|
}
|
|
}
|
|
|
|
ReleaseSysCacheList(catlist);
|
|
|
|
if (OidIsValid(opclass))
|
|
{
|
|
/* Found a suitable opclass, get its default comparator function */
|
|
*sortFunction = get_opclass_proc(opclass, InvalidOid, BTORDER_PROC);
|
|
Assert(RegProcedureIsValid(*sortFunction));
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* Can't find a comparator, so use the operator as-is. Decide whether it
|
|
* is forward or reverse sort by looking at its name (grotty, but this
|
|
* only matters for deciding which end NULLs should get sorted to). XXX
|
|
* possibly better idea: see whether its selectivity function is
|
|
* scalargtcmp?
|
|
*/
|
|
tuple = SearchSysCache(OPEROID,
|
|
ObjectIdGetDatum(sortOperator),
|
|
0, 0, 0);
|
|
if (!HeapTupleIsValid(tuple))
|
|
elog(ERROR, "cache lookup failed for operator %u", sortOperator);
|
|
optup = (Form_pg_operator) GETSTRUCT(tuple);
|
|
if (strcmp(NameStr(optup->oprname), ">") == 0)
|
|
*kind = SORTFUNC_REVLT;
|
|
else
|
|
*kind = SORTFUNC_LT;
|
|
*sortFunction = optup->oprcode;
|
|
ReleaseSysCache(tuple);
|
|
|
|
Assert(RegProcedureIsValid(*sortFunction));
|
|
}
|
|
|
|
/*
|
|
* Inline-able copy of FunctionCall2() to save some cycles in sorting.
|
|
*/
|
|
static inline Datum
|
|
myFunctionCall2(FmgrInfo *flinfo, Datum arg1, Datum arg2)
|
|
{
|
|
FunctionCallInfoData fcinfo;
|
|
Datum result;
|
|
|
|
InitFunctionCallInfoData(fcinfo, flinfo, 2, NULL, NULL);
|
|
|
|
fcinfo.arg[0] = arg1;
|
|
fcinfo.arg[1] = arg2;
|
|
fcinfo.argnull[0] = false;
|
|
fcinfo.argnull[1] = false;
|
|
|
|
result = FunctionCallInvoke(&fcinfo);
|
|
|
|
/* Check for null result, since caller is clearly not expecting one */
|
|
if (fcinfo.isnull)
|
|
elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
|
|
|
|
return result;
|
|
}
|
|
|
|
/*
|
|
* Apply a sort function (by now converted to fmgr lookup form)
|
|
* and return a 3-way comparison result. This takes care of handling
|
|
* NULLs and sort ordering direction properly.
|
|
*/
|
|
static inline int32
|
|
inlineApplySortFunction(FmgrInfo *sortFunction, SortFunctionKind kind,
|
|
Datum datum1, bool isNull1,
|
|
Datum datum2, bool isNull2)
|
|
{
|
|
switch (kind)
|
|
{
|
|
case SORTFUNC_LT:
|
|
if (isNull1)
|
|
{
|
|
if (isNull2)
|
|
return 0;
|
|
return 1; /* NULL sorts after non-NULL */
|
|
}
|
|
if (isNull2)
|
|
return -1;
|
|
if (DatumGetBool(myFunctionCall2(sortFunction, datum1, datum2)))
|
|
return -1; /* a < b */
|
|
if (DatumGetBool(myFunctionCall2(sortFunction, datum2, datum1)))
|
|
return 1; /* a > b */
|
|
return 0;
|
|
|
|
case SORTFUNC_REVLT:
|
|
/* We reverse the ordering of NULLs, but not the operator */
|
|
if (isNull1)
|
|
{
|
|
if (isNull2)
|
|
return 0;
|
|
return -1; /* NULL sorts before non-NULL */
|
|
}
|
|
if (isNull2)
|
|
return 1;
|
|
if (DatumGetBool(myFunctionCall2(sortFunction, datum1, datum2)))
|
|
return -1; /* a < b */
|
|
if (DatumGetBool(myFunctionCall2(sortFunction, datum2, datum1)))
|
|
return 1; /* a > b */
|
|
return 0;
|
|
|
|
case SORTFUNC_CMP:
|
|
if (isNull1)
|
|
{
|
|
if (isNull2)
|
|
return 0;
|
|
return 1; /* NULL sorts after non-NULL */
|
|
}
|
|
if (isNull2)
|
|
return -1;
|
|
return DatumGetInt32(myFunctionCall2(sortFunction,
|
|
datum1, datum2));
|
|
|
|
case SORTFUNC_REVCMP:
|
|
if (isNull1)
|
|
{
|
|
if (isNull2)
|
|
return 0;
|
|
return -1; /* NULL sorts before non-NULL */
|
|
}
|
|
if (isNull2)
|
|
return 1;
|
|
return -DatumGetInt32(myFunctionCall2(sortFunction,
|
|
datum1, datum2));
|
|
|
|
default:
|
|
elog(ERROR, "unrecognized SortFunctionKind: %d", (int) kind);
|
|
return 0; /* can't get here, but keep compiler quiet */
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Non-inline ApplySortFunction() --- this is needed only to conform to
|
|
* C99's brain-dead notions about how to implement inline functions...
|
|
*/
|
|
int32
|
|
ApplySortFunction(FmgrInfo *sortFunction, SortFunctionKind kind,
|
|
Datum datum1, bool isNull1,
|
|
Datum datum2, bool isNull2)
|
|
{
|
|
return inlineApplySortFunction(sortFunction, kind,
|
|
datum1, isNull1,
|
|
datum2, isNull2);
|
|
}
|
|
|
|
|
|
/*
|
|
* Routines specialized for HeapTuple case
|
|
*/
|
|
|
|
static int
|
|
comparetup_heap(Tuplesortstate *state, const SortTuple *a, const SortTuple *b)
|
|
{
|
|
ScanKey scanKey = state->scanKeys;
|
|
HeapTuple ltup;
|
|
HeapTuple rtup;
|
|
TupleDesc tupDesc;
|
|
int nkey;
|
|
int32 compare;
|
|
|
|
/* Compare the leading sort key */
|
|
compare = inlineApplySortFunction(&scanKey->sk_func,
|
|
state->sortFnKinds[0],
|
|
a->datum1, a->isnull1,
|
|
b->datum1, b->isnull1);
|
|
if (compare != 0)
|
|
return compare;
|
|
|
|
/* Compare additional sort keys */
|
|
ltup = (HeapTuple) a->tuple;
|
|
rtup = (HeapTuple) b->tuple;
|
|
tupDesc = state->tupDesc;
|
|
scanKey++;
|
|
for (nkey = 1; nkey < state->nKeys; nkey++, scanKey++)
|
|
{
|
|
AttrNumber attno = scanKey->sk_attno;
|
|
Datum datum1,
|
|
datum2;
|
|
bool isnull1,
|
|
isnull2;
|
|
|
|
datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
|
|
datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
|
|
|
|
compare = inlineApplySortFunction(&scanKey->sk_func,
|
|
state->sortFnKinds[nkey],
|
|
datum1, isnull1,
|
|
datum2, isnull2);
|
|
if (compare != 0)
|
|
return compare;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static void
|
|
copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
|
|
{
|
|
HeapTuple tuple = (HeapTuple) tup;
|
|
|
|
/* copy the tuple into sort storage */
|
|
stup->tuple = (void *) heap_copytuple(tuple);
|
|
USEMEM(state, GetMemoryChunkSpace(stup->tuple));
|
|
/* set up first-column key value */
|
|
stup->datum1 = heap_getattr((HeapTuple) stup->tuple,
|
|
state->scanKeys[0].sk_attno,
|
|
state->tupDesc,
|
|
&stup->isnull1);
|
|
}
|
|
|
|
/*
|
|
* We don't bother to write the HeapTupleData part of the tuple.
|
|
*/
|
|
|
|
static void
|
|
writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
|
|
{
|
|
HeapTuple tuple = (HeapTuple) stup->tuple;
|
|
unsigned int tuplen;
|
|
|
|
tuplen = tuple->t_len + sizeof(tuplen);
|
|
LogicalTapeWrite(state->tapeset, tapenum,
|
|
(void *) &tuplen, sizeof(tuplen));
|
|
LogicalTapeWrite(state->tapeset, tapenum,
|
|
(void *) tuple->t_data, tuple->t_len);
|
|
if (state->randomAccess) /* need trailing length word? */
|
|
LogicalTapeWrite(state->tapeset, tapenum,
|
|
(void *) &tuplen, sizeof(tuplen));
|
|
|
|
FREEMEM(state, GetMemoryChunkSpace(tuple));
|
|
heap_freetuple(tuple);
|
|
}
|
|
|
|
static void
|
|
readtup_heap(Tuplesortstate *state, SortTuple *stup,
|
|
int tapenum, unsigned int len)
|
|
{
|
|
unsigned int tuplen = len - sizeof(unsigned int) + HEAPTUPLESIZE;
|
|
HeapTuple tuple = (HeapTuple) palloc(tuplen);
|
|
|
|
USEMEM(state, GetMemoryChunkSpace(tuple));
|
|
/* reconstruct the HeapTupleData portion */
|
|
tuple->t_len = len - sizeof(unsigned int);
|
|
ItemPointerSetInvalid(&(tuple->t_self));
|
|
tuple->t_tableOid = InvalidOid;
|
|
tuple->t_data = (HeapTupleHeader) (((char *) tuple) + HEAPTUPLESIZE);
|
|
/* read in the tuple proper */
|
|
if (LogicalTapeRead(state->tapeset, tapenum, (void *) tuple->t_data,
|
|
tuple->t_len) != tuple->t_len)
|
|
elog(ERROR, "unexpected end of data");
|
|
if (state->randomAccess) /* need trailing length word? */
|
|
if (LogicalTapeRead(state->tapeset, tapenum, (void *) &tuplen,
|
|
sizeof(tuplen)) != sizeof(tuplen))
|
|
elog(ERROR, "unexpected end of data");
|
|
stup->tuple = (void *) tuple;
|
|
/* set up first-column key value */
|
|
stup->datum1 = heap_getattr(tuple,
|
|
state->scanKeys[0].sk_attno,
|
|
state->tupDesc,
|
|
&stup->isnull1);
|
|
}
|
|
|
|
|
|
/*
|
|
* Routines specialized for IndexTuple case
|
|
*
|
|
* NOTE: actually, these are specialized for the btree case; it's not
|
|
* clear whether you could use them for a non-btree index. Possibly
|
|
* you'd need to make another set of routines if you needed to sort
|
|
* according to another kind of index.
|
|
*/
|
|
|
|
static int
|
|
comparetup_index(Tuplesortstate *state, const SortTuple *a, const SortTuple *b)
|
|
{
|
|
/*
|
|
* This is similar to _bt_tuplecompare(), but we have already done the
|
|
* index_getattr calls for the first column, and we need to keep track
|
|
* of whether any null fields are present. Also see the special treatment
|
|
* for equal keys at the end.
|
|
*/
|
|
ScanKey scanKey = state->indexScanKey;
|
|
IndexTuple tuple1;
|
|
IndexTuple tuple2;
|
|
int keysz;
|
|
TupleDesc tupDes;
|
|
bool equal_hasnull = false;
|
|
int nkey;
|
|
int32 compare;
|
|
|
|
/* Compare the leading sort key */
|
|
compare = inlineApplySortFunction(&scanKey->sk_func,
|
|
SORTFUNC_CMP,
|
|
a->datum1, a->isnull1,
|
|
b->datum1, b->isnull1);
|
|
if (compare != 0)
|
|
return compare;
|
|
|
|
/* they are equal, so we only need to examine one null flag */
|
|
if (a->isnull1)
|
|
equal_hasnull = true;
|
|
|
|
/* Compare additional sort keys */
|
|
tuple1 = (IndexTuple) a->tuple;
|
|
tuple2 = (IndexTuple) b->tuple;
|
|
keysz = state->nKeys;
|
|
tupDes = RelationGetDescr(state->indexRel);
|
|
scanKey++;
|
|
for (nkey = 2; nkey <= keysz; nkey++, scanKey++)
|
|
{
|
|
Datum datum1,
|
|
datum2;
|
|
bool isnull1,
|
|
isnull2;
|
|
|
|
datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
|
|
datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
|
|
|
|
/* see comments about NULLs handling in btbuild */
|
|
|
|
/* the comparison function is always of CMP type */
|
|
compare = inlineApplySortFunction(&scanKey->sk_func,
|
|
SORTFUNC_CMP,
|
|
datum1, isnull1,
|
|
datum2, isnull2);
|
|
|
|
if (compare != 0)
|
|
return compare; /* done when we find unequal attributes */
|
|
|
|
/* they are equal, so we only need to examine one null flag */
|
|
if (isnull1)
|
|
equal_hasnull = true;
|
|
}
|
|
|
|
/*
|
|
* If btree has asked us to enforce uniqueness, complain if two equal
|
|
* tuples are detected (unless there was at least one NULL field).
|
|
*
|
|
* It is sufficient to make the test here, because if two tuples are equal
|
|
* they *must* get compared at some stage of the sort --- otherwise the
|
|
* sort algorithm wouldn't have checked whether one must appear before the
|
|
* other.
|
|
*
|
|
* Some rather brain-dead implementations of qsort will sometimes call the
|
|
* comparison routine to compare a value to itself. (At this writing only
|
|
* QNX 4 is known to do such silly things; we don't support QNX anymore,
|
|
* but perhaps the behavior still exists elsewhere.) Don't raise a bogus
|
|
* error in that case.
|
|
*/
|
|
if (state->enforceUnique && !equal_hasnull && tuple1 != tuple2)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_UNIQUE_VIOLATION),
|
|
errmsg("could not create unique index"),
|
|
errdetail("Table contains duplicated values.")));
|
|
|
|
/*
|
|
* If key values are equal, we sort on ItemPointer. This does not affect
|
|
* validity of the finished index, but it offers cheap insurance against
|
|
* performance problems with bad qsort implementations that have trouble
|
|
* with large numbers of equal keys.
|
|
*/
|
|
{
|
|
BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
|
|
BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
|
|
|
|
if (blk1 != blk2)
|
|
return (blk1 < blk2) ? -1 : 1;
|
|
}
|
|
{
|
|
OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
|
|
OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
|
|
|
|
if (pos1 != pos2)
|
|
return (pos1 < pos2) ? -1 : 1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static void
|
|
copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
|
|
{
|
|
IndexTuple tuple = (IndexTuple) tup;
|
|
unsigned int tuplen = IndexTupleSize(tuple);
|
|
IndexTuple newtuple;
|
|
|
|
/* copy the tuple into sort storage */
|
|
newtuple = (IndexTuple) palloc(tuplen);
|
|
memcpy(newtuple, tuple, tuplen);
|
|
USEMEM(state, GetMemoryChunkSpace(newtuple));
|
|
stup->tuple = (void *) newtuple;
|
|
/* set up first-column key value */
|
|
stup->datum1 = index_getattr(newtuple,
|
|
1,
|
|
RelationGetDescr(state->indexRel),
|
|
&stup->isnull1);
|
|
}
|
|
|
|
static void
|
|
writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
|
|
{
|
|
IndexTuple tuple = (IndexTuple) stup->tuple;
|
|
unsigned int tuplen;
|
|
|
|
tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
|
|
LogicalTapeWrite(state->tapeset, tapenum,
|
|
(void *) &tuplen, sizeof(tuplen));
|
|
LogicalTapeWrite(state->tapeset, tapenum,
|
|
(void *) tuple, IndexTupleSize(tuple));
|
|
if (state->randomAccess) /* need trailing length word? */
|
|
LogicalTapeWrite(state->tapeset, tapenum,
|
|
(void *) &tuplen, sizeof(tuplen));
|
|
|
|
FREEMEM(state, GetMemoryChunkSpace(tuple));
|
|
pfree(tuple);
|
|
}
|
|
|
|
static void
|
|
readtup_index(Tuplesortstate *state, SortTuple *stup,
|
|
int tapenum, unsigned int len)
|
|
{
|
|
unsigned int tuplen = len - sizeof(unsigned int);
|
|
IndexTuple tuple = (IndexTuple) palloc(tuplen);
|
|
|
|
USEMEM(state, GetMemoryChunkSpace(tuple));
|
|
if (LogicalTapeRead(state->tapeset, tapenum, (void *) tuple,
|
|
tuplen) != tuplen)
|
|
elog(ERROR, "unexpected end of data");
|
|
if (state->randomAccess) /* need trailing length word? */
|
|
if (LogicalTapeRead(state->tapeset, tapenum, (void *) &tuplen,
|
|
sizeof(tuplen)) != sizeof(tuplen))
|
|
elog(ERROR, "unexpected end of data");
|
|
stup->tuple = (void *) tuple;
|
|
/* set up first-column key value */
|
|
stup->datum1 = index_getattr(tuple,
|
|
1,
|
|
RelationGetDescr(state->indexRel),
|
|
&stup->isnull1);
|
|
}
|
|
|
|
|
|
/*
|
|
* Routines specialized for DatumTuple case
|
|
*/
|
|
|
|
static int
|
|
comparetup_datum(Tuplesortstate *state, const SortTuple *a, const SortTuple *b)
|
|
{
|
|
return inlineApplySortFunction(&state->sortOpFn, state->sortFnKind,
|
|
a->datum1, a->isnull1,
|
|
b->datum1, b->isnull1);
|
|
}
|
|
|
|
static void
|
|
copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup)
|
|
{
|
|
/* Not currently needed */
|
|
elog(ERROR, "copytup_datum() should not be called");
|
|
}
|
|
|
|
static void
|
|
writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
|
|
{
|
|
void *waddr;
|
|
unsigned int tuplen;
|
|
unsigned int writtenlen;
|
|
|
|
if (stup->isnull1)
|
|
{
|
|
waddr = NULL;
|
|
tuplen = 0;
|
|
}
|
|
else if (state->datumTypeByVal)
|
|
{
|
|
waddr = &stup->datum1;
|
|
tuplen = sizeof(Datum);
|
|
}
|
|
else
|
|
{
|
|
waddr = DatumGetPointer(stup->datum1);
|
|
tuplen = datumGetSize(stup->datum1, false, state->datumTypeLen);
|
|
Assert(tuplen != 0);
|
|
}
|
|
|
|
writtenlen = tuplen + sizeof(unsigned int);
|
|
|
|
LogicalTapeWrite(state->tapeset, tapenum,
|
|
(void *) &writtenlen, sizeof(writtenlen));
|
|
LogicalTapeWrite(state->tapeset, tapenum,
|
|
waddr, tuplen);
|
|
if (state->randomAccess) /* need trailing length word? */
|
|
LogicalTapeWrite(state->tapeset, tapenum,
|
|
(void *) &writtenlen, sizeof(writtenlen));
|
|
|
|
if (stup->tuple)
|
|
{
|
|
FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
|
|
pfree(stup->tuple);
|
|
}
|
|
}
|
|
|
|
static void
|
|
readtup_datum(Tuplesortstate *state, SortTuple *stup,
|
|
int tapenum, unsigned int len)
|
|
{
|
|
unsigned int tuplen = len - sizeof(unsigned int);
|
|
|
|
if (tuplen == 0)
|
|
{
|
|
/* it's NULL */
|
|
stup->datum1 = (Datum) 0;
|
|
stup->isnull1 = true;
|
|
stup->tuple = NULL;
|
|
}
|
|
else if (state->datumTypeByVal)
|
|
{
|
|
Assert(tuplen == sizeof(Datum));
|
|
if (LogicalTapeRead(state->tapeset, tapenum, (void *) &stup->datum1,
|
|
tuplen) != tuplen)
|
|
elog(ERROR, "unexpected end of data");
|
|
stup->isnull1 = false;
|
|
stup->tuple = NULL;
|
|
}
|
|
else
|
|
{
|
|
void *raddr = palloc(tuplen);
|
|
|
|
if (LogicalTapeRead(state->tapeset, tapenum, raddr,
|
|
tuplen) != tuplen)
|
|
elog(ERROR, "unexpected end of data");
|
|
stup->datum1 = PointerGetDatum(raddr);
|
|
stup->isnull1 = false;
|
|
stup->tuple = raddr;
|
|
USEMEM(state, GetMemoryChunkSpace(raddr));
|
|
}
|
|
|
|
if (state->randomAccess) /* need trailing length word? */
|
|
if (LogicalTapeRead(state->tapeset, tapenum, (void *) &tuplen,
|
|
sizeof(tuplen)) != sizeof(tuplen))
|
|
elog(ERROR, "unexpected end of data");
|
|
}
|