Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ jobs:
passedTests=$(echo "$OUTPUT" | sed -nE 's/.*Total: ([0-9]+) passed.*/\1/p')
passedTests=${passedTests:-0}

REQUIRED_TESTS_TO_PASS=22
REQUIRED_TESTS_TO_PASS=25
echo "Required tests to pass: $REQUIRED_TESTS_TO_PASS"
[ "$passedTests" -ge "$REQUIRED_TESTS_TO_PASS" ] || exit $exit_code

Expand Down
66 changes: 66 additions & 0 deletions src/Capability/Registry.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,20 @@
use Mcp\Exception\PromptNotFoundException;
use Mcp\Exception\ResourceNotFoundException;
use Mcp\Exception\ToolNotFoundException;
use Mcp\Schema\Notification\ResourceUpdatedNotification;
use Mcp\Schema\Page;
use Mcp\Schema\Prompt;
use Mcp\Schema\Resource;
use Mcp\Schema\ResourceTemplate;
use Mcp\Schema\Tool;
use Mcp\Server\Protocol;
use Mcp\Server\Session\SessionFactoryInterface;
use Mcp\Server\Session\SessionInterface;
use Mcp\Server\Session\SessionStoreInterface;
use Psr\EventDispatcher\EventDispatcherInterface;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Psr\SimpleCache\InvalidArgumentException;

/**
* Registry implementation that manages MCP element registration and access.
Expand Down Expand Up @@ -64,6 +70,8 @@ final class Registry implements RegistryInterface
public function __construct(
private readonly ?EventDispatcherInterface $eventDispatcher = null,
private readonly LoggerInterface $logger = new NullLogger(),
private readonly ?SessionStoreInterface $sessionStore = null,
private readonly ?SessionFactoryInterface $sessionFactory = null,
private readonly NameValidator $nameValidator = new NameValidator(),
) {
}
Expand Down Expand Up @@ -391,6 +399,64 @@ public function setDiscoveryState(DiscoveryState $state): void
}
}

/**
* @throws InvalidArgumentException
*/
public function subscribe(SessionInterface $session, string $uri): void
{
$subscriptions = $session->get('resource_subscriptions', []);
$subscriptions[$uri] = true;
$session->set('resource_subscriptions', $subscriptions);
$session->save();
}

/**
* @throws InvalidArgumentException
*/
public function unsubscribe(SessionInterface $session, string $uri): void
{
$subscriptions = $session->get('resource_subscriptions', []);
unset($subscriptions[$uri]);
$session->set('resource_subscriptions', $subscriptions);
$session->save();
}

public function notifyResourceChanged(Protocol $protocol, string $uri): void
{
if (!$this->sessionStore || !$this->sessionFactory) {
$this->logger->warning('Cannot send resource notifications: session store or factory not configured.');

return;
}

foreach ($this->sessionStore->getAllSessionIds() as $sessionId) {
try {
$sessionData = $this->sessionStore->read($sessionId);
if (!$sessionData) {
continue;
}

$sessionArray = json_decode($sessionData, true);
if (!\is_array($sessionArray)) {
continue;
}

if (!isset($sessionArray['resource_subscriptions'][$uri])) {
continue;
}

$session = $this->sessionFactory->createWithId($sessionId, $this->sessionStore);
$protocol->sendNotification(new ResourceUpdatedNotification($uri), $session);
} catch (\Throwable $e) {
$this->logger->error('Error sending resource notification to session', [
'session_id' => $sessionId->toRfc4122(),
'uri' => $uri,
'exception' => $e,
]);
}
}
}

/**
* Calculate next cursor for pagination.
*
Expand Down
18 changes: 18 additions & 0 deletions src/Capability/RegistryInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
use Mcp\Schema\Resource;
use Mcp\Schema\ResourceTemplate;
use Mcp\Schema\Tool;
use Mcp\Server\Protocol;
use Mcp\Server\Session\SessionInterface;

/**
* @phpstan-import-type Handler from ElementReference
Expand Down Expand Up @@ -157,4 +159,20 @@ public function getPrompts(?int $limit = null, ?string $cursor = null): Page;
* @throws PromptNotFoundException
*/
public function getPrompt(string $name): PromptReference;

/**
* Subscribes a session to a specific resource URI.
*/
public function subscribe(SessionInterface $session, string $uri): void;

/**
* Unsubscribes a session from a specific resource URI.
*/
public function unsubscribe(SessionInterface $session, string $uri): void;

/**
* Notifies all sessions subscribed to the given resource URI that the
* resource has changed. Sends a ResourceUpdatedNotification for each subscriber.
*/
public function notifyResourceChanged(Protocol $protocol, string $uri): void;
}
19 changes: 14 additions & 5 deletions src/Server/Builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,17 @@ public function build(): Server
{
$logger = $this->logger ?? new NullLogger();
$container = $this->container ?? new Container();
$registry = $this->registry ?? new Registry($this->eventDispatcher, $logger);

$sessionTtl = $this->sessionTtl ?? 3600;
$sessionFactory = $this->sessionFactory ?? new SessionFactory();
$sessionStore = $this->sessionStore ?? new InMemorySessionStore($sessionTtl);

$registry = $this->registry ?? new Registry(
$this->eventDispatcher,
$logger,
$sessionStore,
$sessionFactory
);

$loaders = [
...$this->loaders,
Expand All @@ -504,16 +514,13 @@ public function build(): Server
$loader->load($registry);
}

$sessionTtl = $this->sessionTtl ?? 3600;
$sessionFactory = $this->sessionFactory ?? new SessionFactory();
$sessionStore = $this->sessionStore ?? new InMemorySessionStore($sessionTtl);
$messageFactory = MessageFactory::make();

$capabilities = $this->serverCapabilities ?? new ServerCapabilities(
tools: $registry->hasTools(),
toolsListChanged: $this->eventDispatcher instanceof EventDispatcherInterface,
resources: $registry->hasResources() || $registry->hasResourceTemplates(),
resourcesSubscribe: false,
resourcesSubscribe: $registry->hasResources() || $registry->hasResourceTemplates(),
resourcesListChanged: $this->eventDispatcher instanceof EventDispatcherInterface,
prompts: $registry->hasPrompts(),
promptsListChanged: $this->eventDispatcher instanceof EventDispatcherInterface,
Expand All @@ -536,6 +543,8 @@ public function build(): Server
new Handler\Request\ListToolsHandler($registry, $this->paginationLimit),
new Handler\Request\PingHandler(),
new Handler\Request\ReadResourceHandler($registry, $referenceHandler, $logger),
new Handler\Request\ResourceSubscribeHandler($registry, $logger),
new Handler\Request\ResourceUnsubscribeHandler($registry, $logger),
new Handler\Request\SetLogLevelHandler(),
]);

Expand Down
66 changes: 66 additions & 0 deletions src/Server/Handler/Request/ResourceSubscribeHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?php

/*
* This file is part of the official PHP MCP SDK.
*
* A collaboration between Symfony and the PHP Foundation.
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Mcp\Server\Handler\Request;

use Mcp\Capability\RegistryInterface;
use Mcp\Exception\ResourceNotFoundException;
use Mcp\Schema\JsonRpc\Error;
use Mcp\Schema\JsonRpc\Request;
use Mcp\Schema\JsonRpc\Response;
use Mcp\Schema\Request\ResourceSubscribeRequest;
use Mcp\Schema\Result\EmptyResult;
use Mcp\Server\Session\SessionInterface;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;

/**
* @implements RequestHandlerInterface<EmptyResult>
*
* @author Larry Sule-balogun <[email protected]>
*/
final class ResourceSubscribeHandler implements RequestHandlerInterface
{
public function __construct(
private readonly RegistryInterface $registry,
private readonly LoggerInterface $logger = new NullLogger(),
) {
}

public function supports(Request $request): bool
{
return $request instanceof ResourceSubscribeRequest;
}

public function handle(Request $request, SessionInterface $session): Response|Error
{
\assert($request instanceof ResourceSubscribeRequest);

$uri = $request->uri;

try {
$this->registry->getResource($uri);
} catch (ResourceNotFoundException $e) {
$this->logger->error('Resource not found', ['uri' => $uri]);

return Error::forResourceNotFound($e->getMessage(), $request->getId());
}

$this->logger->debug('Subscribing to resource', ['uri' => $uri]);

$this->registry->subscribe($session, $uri);

return new Response(
$request->getId(),
new EmptyResult(),
);
}
}
66 changes: 66 additions & 0 deletions src/Server/Handler/Request/ResourceUnsubscribeHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?php

/*
* This file is part of the official PHP MCP SDK.
*
* A collaboration between Symfony and the PHP Foundation.
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Mcp\Server\Handler\Request;

use Mcp\Capability\RegistryInterface;
use Mcp\Exception\ResourceNotFoundException;
use Mcp\Schema\JsonRpc\Error;
use Mcp\Schema\JsonRpc\Request;
use Mcp\Schema\JsonRpc\Response;
use Mcp\Schema\Request\ResourceUnsubscribeRequest;
use Mcp\Schema\Result\EmptyResult;
use Mcp\Server\Session\SessionInterface;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;

/**
* @implements RequestHandlerInterface<EmptyResult>
*
* @author Larry Sule-balogun <[email protected]>
*/
final class ResourceUnsubscribeHandler implements RequestHandlerInterface
{
public function __construct(
private readonly RegistryInterface $registry,
private readonly LoggerInterface $logger = new NullLogger(),
) {
}

public function supports(Request $request): bool
{
return $request instanceof ResourceUnsubscribeRequest;
}

public function handle(Request $request, SessionInterface $session): Response|Error
{
\assert($request instanceof ResourceUnsubscribeRequest);

$uri = $request->uri;

try {
$this->registry->getResource($uri);
} catch (ResourceNotFoundException $e) {
$this->logger->error('Resource not found', ['uri' => $uri]);

return Error::forResourceNotFound($e->getMessage(), $request->getId());
}

$this->logger->debug('Unsubscribing from resource', ['uri' => $uri]);

$this->registry->unsubscribe($session, $uri);

return new Response(
$request->getId(),
new EmptyResult(),
);
}
}
38 changes: 38 additions & 0 deletions src/Server/Session/FileSessionStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,44 @@ public function gc(): array
return $deleted;
}

public function getAllSessionIds(): array
{
$sessionIds = [];
$now = $this->clock->now()->getTimestamp();

$dir = @opendir($this->directory);
if (false === $dir) {
return $sessionIds;
}

while (($entry = readdir($dir)) !== false) {
// Skip dot entries
if ('.' === $entry || '..' === $entry) {
continue;
}

$path = $this->directory.\DIRECTORY_SEPARATOR.$entry;
if (!is_file($path)) {
continue;
}

$mtime = @filemtime($path) ?: 0;
if (($now - $mtime) > $this->ttl) {
continue;
}

try {
$sessionIds[] = Uuid::fromString($entry);
} catch (\Throwable) {
// ignore non-UUID file names
}
}

closedir($dir);

return $sessionIds;
}

private function pathFor(Uuid $id): string
{
return $this->directory.\DIRECTORY_SEPARATOR.$id->toRfc4122();
Expand Down
10 changes: 10 additions & 0 deletions src/Server/Session/InMemorySessionStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,14 @@ public function gc(): array

return $deletedSessions;
}

public function getAllSessionIds(): array
{
$ids = [];
foreach (array_keys($this->store) as $id) {
$ids[] = Uuid::fromString($id);
}

return $ids;
}
}
Loading