Skip to content

Commit

Permalink
Merge 'minor-next' into 'major-next'
Browse files Browse the repository at this point in the history
  • Loading branch information
pmmp-admin-bot[bot] committed Dec 24, 2024
2 parents 1c35987 + 8a5eb71 commit 882d8c4
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 32 deletions.
30 changes: 21 additions & 9 deletions src/network/mcpe/NetworkSession.php
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
use pocketmine\utils\TextFormat;
use pocketmine\world\format\io\GlobalItemDataHandlers;
use pocketmine\world\Position;
use pocketmine\world\World;
use pocketmine\YmlServerProperties;
use function array_map;
use function array_values;
Expand Down Expand Up @@ -1178,15 +1179,32 @@ public function onCloseAllForms() : void{
$this->sendDataPacket(ClientboundCloseFormPacket::create());
}

/**
* @phpstan-param \Closure() : void $onCompletion
*/
private function sendChunkPacket(string $chunkPacket, \Closure $onCompletion, World $world) : void{
$world->timings->syncChunkSend->startTiming();
try{
$this->queueCompressed($chunkPacket);
$onCompletion();
}finally{
$world->timings->syncChunkSend->stopTiming();
}
}

/**
* Instructs the networksession to start using the chunk at the given coordinates. This may occur asynchronously.
* @param \Closure $onCompletion To be called when chunk sending has completed.
* @phpstan-param \Closure() : void $onCompletion
*/
public function startUsingChunk(int $chunkX, int $chunkZ, \Closure $onCompletion) : void{
$world = $this->player->getLocation()->getWorld();
ChunkCache::getInstance($world, $this->compressor)->request($chunkX, $chunkZ)->onResolve(

$promiseOrPacket = ChunkCache::getInstance($world, $this->compressor)->request($chunkX, $chunkZ);
if(is_string($promiseOrPacket)){
$this->sendChunkPacket($promiseOrPacket, $onCompletion, $world);
return;
}
$promiseOrPacket->onResolve(
//this callback may be called synchronously or asynchronously, depending on whether the promise is resolved yet
function(CompressBatchPromise $promise) use ($world, $onCompletion, $chunkX, $chunkZ) : void{
if(!$this->isConnected()){
Expand All @@ -1204,13 +1222,7 @@ function(CompressBatchPromise $promise) use ($world, $onCompletion, $chunkX, $ch
//to NEEDED if they want to be resent.
return;
}
$world->timings->syncChunkSend->startTiming();
try{
$this->queueCompressed($promise);
$onCompletion();
}finally{
$world->timings->syncChunkSend->stopTiming();
}
$this->sendChunkPacket($promise->getResult(), $onCompletion, $world);
}
);
}
Expand Down
57 changes: 34 additions & 23 deletions src/network/mcpe/cache/ChunkCache.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
use pocketmine\world\ChunkListenerNoOpTrait;
use pocketmine\world\format\Chunk;
use pocketmine\world\World;
use function is_string;
use function spl_object_id;
use function strlen;

Expand Down Expand Up @@ -69,7 +70,7 @@ public static function pruneCaches() : void{
foreach(self::$instances as $compressorMap){
foreach($compressorMap as $chunkCache){
foreach($chunkCache->caches as $chunkHash => $promise){
if($promise->hasResult()){
if(is_string($promise)){
//Do not clear promises that are not yet fulfilled; they will have requesters waiting on them
unset($chunkCache->caches[$chunkHash]);
}
Expand All @@ -79,8 +80,8 @@ public static function pruneCaches() : void{
}

/**
* @var CompressBatchPromise[]
* @phpstan-var array<int, CompressBatchPromise>
* @var CompressBatchPromise[]|string[]
* @phpstan-var array<int, CompressBatchPromise|string>
*/
private array $caches = [];

Expand All @@ -92,47 +93,57 @@ private function __construct(
private Compressor $compressor
){}

/**
* Requests asynchronous preparation of the chunk at the given coordinates.
*
* @return CompressBatchPromise a promise of resolution which will contain a compressed chunk packet.
*/
public function request(int $chunkX, int $chunkZ) : CompressBatchPromise{
private function prepareChunkAsync(int $chunkX, int $chunkZ, int $chunkHash) : CompressBatchPromise{
$this->world->registerChunkListener($this, $chunkX, $chunkZ);
$chunk = $this->world->getChunk($chunkX, $chunkZ);
if($chunk === null){
throw new \InvalidArgumentException("Cannot request an unloaded chunk");
}
$chunkHash = World::chunkHash($chunkX, $chunkZ);

if(isset($this->caches[$chunkHash])){
++$this->hits;
return $this->caches[$chunkHash];
}

++$this->misses;

$this->world->timings->syncChunkSendPrepare->startTiming();
try{
$this->caches[$chunkHash] = new CompressBatchPromise();
$promise = new CompressBatchPromise();

$this->world->getServer()->getAsyncPool()->submitTask(
new ChunkRequestTask(
$chunkX,
$chunkZ,
DimensionIds::OVERWORLD, //TODO: not hardcode this
$chunk,
$this->caches[$chunkHash],
$promise,
$this->compressor
)
);
$this->caches[$chunkHash] = $promise;
$promise->onResolve(function(CompressBatchPromise $promise) use ($chunkHash) : void{
//the promise may have been discarded or replaced if the chunk was unloaded or modified in the meantime
if(($this->caches[$chunkHash] ?? null) === $promise){
$this->caches[$chunkHash] = $promise->getResult();
}
});

return $this->caches[$chunkHash];
return $promise;
}finally{
$this->world->timings->syncChunkSendPrepare->stopTiming();
}
}

/**
* Requests asynchronous preparation of the chunk at the given coordinates.
*
* @return CompressBatchPromise|string Compressed chunk packet, or a promise for one to be resolved asynchronously.
*/
public function request(int $chunkX, int $chunkZ) : CompressBatchPromise|string{
$chunkHash = World::chunkHash($chunkX, $chunkZ);
if(isset($this->caches[$chunkHash])){
++$this->hits;
return $this->caches[$chunkHash];
}

return $this->prepareChunkAsync($chunkX, $chunkZ, $chunkHash);
}

private function destroy(int $chunkX, int $chunkZ) : bool{
$chunkHash = World::chunkHash($chunkX, $chunkZ);
$existing = $this->caches[$chunkHash] ?? null;
Expand All @@ -148,12 +159,12 @@ private function destroyOrRestart(int $chunkX, int $chunkZ) : void{
$chunkPosHash = World::chunkHash($chunkX, $chunkZ);
$cache = $this->caches[$chunkPosHash] ?? null;
if($cache !== null){
if(!$cache->hasResult()){
if(!is_string($cache)){
//some requesters are waiting for this chunk, so their request needs to be fulfilled
$cache->cancel();
unset($this->caches[$chunkPosHash]);

$this->request($chunkX, $chunkZ)->onResolve(...$cache->getResolveCallbacks());
$this->prepareChunkAsync($chunkX, $chunkZ, $chunkPosHash)->onResolve(...$cache->getResolveCallbacks());
}else{
//dump the cache, it'll be regenerated the next time it's requested
$this->destroy($chunkX, $chunkZ);
Expand Down Expand Up @@ -199,8 +210,8 @@ public function onChunkUnloaded(int $chunkX, int $chunkZ, Chunk $chunk) : void{
public function calculateCacheSize() : int{
$result = 0;
foreach($this->caches as $cache){
if($cache->hasResult()){
$result += strlen($cache->getResult());
if(is_string($cache)){
$result += strlen($cache);
}
}
return $result;
Expand Down

0 comments on commit 882d8c4

Please sign in to comment.