mirror of
https://github.com/nextcloud/server.git
synced 2026-05-28 04:32:30 -04:00
feat: support migrating an instance to sharding
Signed-off-by: Robin Appelman <robin@icewind.nl>
This commit is contained in:
parent
77db5c76e8
commit
ea678c0280
6 changed files with 52 additions and 19 deletions
|
|
@ -160,7 +160,7 @@ class Connection extends PrimaryReadReplicaConnection {
|
|||
$this->_config->setSQLLogger($debugStack);
|
||||
}
|
||||
|
||||
/** @var array<string, array{shards: array[], mapper: ?string}> $shardConfig */
|
||||
/** @var array<string, array{shards: array[], mapper: ?string, from_primary_key: ?int, from_shard_key: ?int}> $shardConfig */
|
||||
$shardConfig = $this->params['sharding'] ?? [];
|
||||
$shardNames = array_keys($shardConfig);
|
||||
$this->shards = array_map(function (array $config, string $name) {
|
||||
|
|
@ -180,7 +180,9 @@ class Connection extends PrimaryReadReplicaConnection {
|
|||
self::SHARD_PRESETS[$name]['shard_key'],
|
||||
$shardMapper,
|
||||
self::SHARD_PRESETS[$name]['companion_tables'],
|
||||
$config['shards']
|
||||
$config['shards'],
|
||||
$config['from_primary_key'] ?? 0,
|
||||
$config['from_shard_key'] ?? 0,
|
||||
);
|
||||
}, $shardConfig, $shardNames);
|
||||
$this->shards = array_combine($shardNames, $this->shards);
|
||||
|
|
@ -199,8 +201,10 @@ class Connection extends PrimaryReadReplicaConnection {
|
|||
if ($this->isShardingEnabled) {
|
||||
foreach ($this->shards as $shardDefinition) {
|
||||
foreach ($shardDefinition->getAllShards() as $shard) {
|
||||
/** @var ConnectionAdapter $connection */
|
||||
$connections[] = $this->shardConnectionManager->getConnection($shardDefinition, $shard);
|
||||
if ($shard !== ShardDefinition::MIGRATION_SHARD) {
|
||||
/** @var ConnectionAdapter $connection */
|
||||
$connections[] = $this->shardConnectionManager->getConnection($shardDefinition, $shard);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -108,7 +108,7 @@ class AutoIncrementHandler {
|
|||
}
|
||||
|
||||
// discard the encoded initial shard
|
||||
$current = $this->getMaxFromDb($shardDefinition) >> 8;
|
||||
$current = $this->getMaxFromDb($shardDefinition);
|
||||
$next = max($current, self::MIN_VALID_KEY) + 1;
|
||||
if ($cache->cas($shardDefinition->table, 'empty-placeholder', $next)) {
|
||||
return $next;
|
||||
|
|
@ -131,19 +131,22 @@ class AutoIncrementHandler {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get the maximum primary key value from the shards
|
||||
* Get the maximum primary key value from the shards, note that this has already stripped any embedded shard id
|
||||
*/
|
||||
private function getMaxFromDb(ShardDefinition $shardDefinition): int {
|
||||
$max = 0;
|
||||
$max = $shardDefinition->fromFileId;
|
||||
$query = $this->shardConnectionManager->getConnection($shardDefinition, 0)->getQueryBuilder();
|
||||
$query->select($shardDefinition->primaryKey)
|
||||
->from($shardDefinition->table)
|
||||
->orderBy($shardDefinition->primaryKey, 'DESC')
|
||||
->setMaxResults(1);
|
||||
foreach ($shardDefinition->getAllShards() as $shard) {
|
||||
$connection = $this->shardConnectionManager->getConnection($shardDefinition, $shard);
|
||||
$query = $connection->getQueryBuilder();
|
||||
$query->select($shardDefinition->primaryKey)
|
||||
->from($shardDefinition->table)
|
||||
->orderBy($shardDefinition->primaryKey, 'DESC')
|
||||
->setMaxResults(1);
|
||||
$result = $query->executeQuery()->fetchOne();
|
||||
$result = $query->executeQuery($connection)->fetchOne();
|
||||
if ($result) {
|
||||
if ($result > $shardDefinition->fromFileId) {
|
||||
$result = $result >> 8;
|
||||
}
|
||||
$max = max($max, $result);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,8 +28,17 @@ class ShardConnectionManager {
|
|||
|
||||
public function getConnection(ShardDefinition $shardDefinition, int $shard): IDBConnection {
|
||||
$connectionKey = $shardDefinition->table . '_' . $shard;
|
||||
if (!isset($this->connections[$connectionKey])) {
|
||||
|
||||
if (isset($this->connections[$connectionKey])) {
|
||||
return $this->connections[$connectionKey];
|
||||
}
|
||||
|
||||
if ($shard === ShardDefinition::MIGRATION_SHARD) {
|
||||
$this->connections[$connectionKey] = \OC::$server->get(IDBConnection::class);
|
||||
} elseif (isset($shardDefinition->shards[$shard])) {
|
||||
$this->connections[$connectionKey] = $this->createConnection($shardDefinition->shards[$shard]);
|
||||
} else {
|
||||
throw new \InvalidArgumentException("invalid shard key $shard only " . count($shardDefinition->shards) . ' configured');
|
||||
}
|
||||
|
||||
return $this->connections[$connectionKey];
|
||||
|
|
|
|||
|
|
@ -15,7 +15,9 @@ use OCP\DB\QueryBuilder\Sharded\IShardMapper;
|
|||
*/
|
||||
class ShardDefinition {
|
||||
// we reserve the bottom byte of the primary key for the initial shard, so the total shard count is limited to what we can fit there
|
||||
public const MAX_SHARDS = 256;
|
||||
// additionally, shard id 255 is reserved for migration purposes
|
||||
public const MAX_SHARDS = 255;
|
||||
public const MIGRATION_SHARD = 255;
|
||||
|
||||
public const PRIMARY_KEY_MASK = 0x7F_FF_FF_FF_FF_FF_FF_00;
|
||||
public const PRIMARY_KEY_SHARD_MASK = 0x00_00_00_00_00_00_00_FF;
|
||||
|
|
@ -37,8 +39,10 @@ class ShardDefinition {
|
|||
public array $companionKeys,
|
||||
public string $shardKey,
|
||||
public IShardMapper $shardMapper,
|
||||
public array $companionTables = [],
|
||||
public array $shards = [],
|
||||
public array $companionTables,
|
||||
public array $shards,
|
||||
public int $fromFileId,
|
||||
public int $fromStorageId,
|
||||
) {
|
||||
if (count($this->shards) >= self::MAX_SHARDS) {
|
||||
throw new \Exception('Only allowed maximum of ' . self::MAX_SHARDS . ' shards allowed');
|
||||
|
|
@ -53,11 +57,21 @@ class ShardDefinition {
|
|||
}
|
||||
|
||||
public function getShardForKey(int $key): int {
|
||||
if ($key < $this->fromStorageId) {
|
||||
return self::MIGRATION_SHARD;
|
||||
}
|
||||
return $this->shardMapper->getShardForKey($key, count($this->shards));
|
||||
}
|
||||
|
||||
/**
|
||||
* @return list<int>
|
||||
*/
|
||||
public function getAllShards(): array {
|
||||
return array_keys($this->shards);
|
||||
if ($this->fromStorageId !== 0) {
|
||||
return array_merge(array_keys($this->shards), [self::MIGRATION_SHARD]);
|
||||
} else {
|
||||
return array_keys($this->shards);
|
||||
}
|
||||
}
|
||||
|
||||
public function isKey(string $column): bool {
|
||||
|
|
|
|||
|
|
@ -55,6 +55,9 @@ class ShardQueryRunner {
|
|||
private function getLikelyShards(array $primaryKeys): array {
|
||||
$shards = [];
|
||||
foreach ($primaryKeys as $primaryKey) {
|
||||
if ($primaryKey < $this->shardDefinition->fromFileId && !in_array(ShardDefinition::MIGRATION_SHARD, $shards)) {
|
||||
$shards[] = ShardDefinition::MIGRATION_SHARD;
|
||||
}
|
||||
$encodedShard = $primaryKey & ShardDefinition::PRIMARY_KEY_SHARD_MASK;
|
||||
if ($encodedShard < count($this->shardDefinition->shards) && !in_array($encodedShard, $shards)) {
|
||||
$shards[] = $encodedShard;
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ class SharedQueryBuilderTest extends TestCase {
|
|||
return new ShardedQueryBuilder(
|
||||
$this->connection->getQueryBuilder(),
|
||||
[
|
||||
new ShardDefinition($table, $primaryColumn, [], $shardColumn, new RoundRobinShardMapper(), $companionTables, []),
|
||||
new ShardDefinition($table, $primaryColumn, [], $shardColumn, new RoundRobinShardMapper(), $companionTables, [], 0, 0),
|
||||
],
|
||||
$this->createMock(ShardConnectionManager::class),
|
||||
$this->autoIncrementHandler,
|
||||
|
|
|
|||
Loading…
Reference in a new issue