Skip to content

Refactory repository interface with single pending message and subscriptions management #33

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 7 commits into from
Aug 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions src/Contracts/MessageProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
use PhpMqtt\Client\Exceptions\ConnectingToBrokerFailedException;
use PhpMqtt\Client\Exceptions\InvalidMessageException;
use PhpMqtt\Client\Exceptions\MqttClientException;
use PhpMqtt\Client\Exceptions\UnexpectedAcknowledgementException;
use PhpMqtt\Client\Exceptions\ProtocolViolationException;
use PhpMqtt\Client\Message;
use PhpMqtt\Client\Subscription;

/**
* Implementations of this interface provide message parsing capabilities.
Expand Down Expand Up @@ -42,7 +43,7 @@ public function tryFindMessageInBuffer(string $buffer, int $bufferLength, string
* @param string $message
* @return Message|null
* @throws InvalidMessageException
* @throws UnexpectedAcknowledgementException
* @throws ProtocolViolationException
* @throws MqttClientException
*/
public function parseAndValidateMessage(string $message): ?Message;
Expand All @@ -58,11 +59,18 @@ public function parseAndValidateMessage(string $message): ?Message;
public function buildConnectMessage(ConnectionSettings $connectionSettings, bool $useCleanSession = false): string;

/**
* Builds a ping message.
* Builds a ping request message.
*
* @return string
*/
public function buildPingMessage(): string;
public function buildPingRequestMessage(): string;

/**
* Builds a ping response message.
*
* @return string
*/
public function buildPingResponseMessage(): string;

/**
* Builds a disconnect message.
Expand All @@ -74,22 +82,22 @@ public function buildDisconnectMessage(): string;
/**
* Builds a subscribe message from the given parameters.
*
* @param int $messageId
* @param string $topic
* @param int $qualityOfService
* @param int $messageId
* @param Subscription[] $subscriptions
* @param bool $isDuplicate
* @return string
*/
public function buildSubscribeMessage(int $messageId, string $topic, int $qualityOfService): string;
public function buildSubscribeMessage(int $messageId, array $subscriptions, bool $isDuplicate = false): string;

/**
* Builds an unsubscribe message from the given parameters.
*
* @param int $messageId
* @param string $topic
* @param bool $isDuplicate
* @param int $messageId
* @param string[] $topics
* @param bool $isDuplicate
* @return string
*/
public function buildUnsubscribeMessage(int $messageId, string $topic, bool $isDuplicate = false): string;
public function buildUnsubscribeMessage(int $messageId, array $topics, bool $isDuplicate = false): string;

/**
* Builds a publish message based on the given parameters.
Expand Down
9 changes: 5 additions & 4 deletions src/Contracts/MqttClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
use PhpMqtt\Client\Exceptions\ConfigurationInvalidException;
use PhpMqtt\Client\Exceptions\ConnectingToBrokerFailedException;
use PhpMqtt\Client\Exceptions\DataTransferException;
use PhpMqtt\Client\Exceptions\TopicNotSubscribedException;
use PhpMqtt\Client\Exceptions\UnexpectedAcknowledgementException;
use PhpMqtt\Client\Exceptions\ProtocolViolationException;
use PhpMqtt\Client\Exceptions\RepositoryException;

/**
* An interface for the MQTT client.
Expand Down Expand Up @@ -57,6 +57,7 @@ public function isConnected(): bool;
* @param bool $retain
* @return void
* @throws DataTransferException
* @throws RepositoryException
*/
public function publish(string $topic, string $message, int $qualityOfService = 0, bool $retain = false): void;

Expand Down Expand Up @@ -86,6 +87,7 @@ public function publish(string $topic, string $message, int $qualityOfService =
* @param int $qualityOfService
* @return void
* @throws DataTransferException
* @throws RepositoryException
*/
public function subscribe(string $topic, callable $callback, int $qualityOfService = 0): void;

Expand All @@ -95,7 +97,6 @@ public function subscribe(string $topic, callable $callback, int $qualityOfServi
* @param string $topic
* @return void
* @throws DataTransferException
* @throws TopicNotSubscribedException
*/
public function unsubscribe(string $topic): void;

Expand Down Expand Up @@ -137,7 +138,7 @@ public function interrupt(): void;
* @param int|null $queueWaitLimit
* @return void
* @throws DataTransferException
* @throws UnexpectedAcknowledgementException
* @throws ProtocolViolationException
*/
public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false, int $queueWaitLimit = null): void;

Expand Down
180 changes: 65 additions & 115 deletions src/Contracts/Repository.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
namespace PhpMqtt\Client\Contracts;

use DateTime;
use PhpMqtt\Client\Exceptions\PendingPublishConfirmationAlreadyExistsException;
use PhpMqtt\Client\PublishedMessage;
use PhpMqtt\Client\TopicSubscription;
use PhpMqtt\Client\UnsubscribeRequest;
use PhpMqtt\Client\Exceptions\PendingMessageAlreadyExistsException;
use PhpMqtt\Client\Exceptions\PendingMessageNotFoundException;
use PhpMqtt\Client\Exceptions\RepositoryException;
use PhpMqtt\Client\PendingMessage;
use PhpMqtt\Client\Subscription;

/**
* Implementations of this interface provide storage capabilities to an MQTT client.
Expand All @@ -31,190 +32,139 @@ interface Repository
* but it is currently not being used (i.e. in a resend queue).
*
* @return int
* @throws RepositoryException
*/
public function newMessageId(): int;

/**
* Releases the given message id, allowing it to be reused in the future.
*
* @param int $messageId
* @return void
*/
public function releaseMessageId(int $messageId): void;

/**
* Returns the number of registered topic subscriptions. The method does
* not differentiate between pending and acknowledged subscriptions.
* Returns the number of pending outgoing messages.
*
* @return int
*/
public function countTopicSubscriptions(): int;

/**
* Adds a topic subscription to the repository.
*
* @param TopicSubscription $subscription
* @return void
*/
public function addTopicSubscription(TopicSubscription $subscription): void;
public function countPendingOutgoingMessages(): int;

/**
* Get all topic subscriptions with the given message identifier.
* Gets a pending outgoing message with the given message identifier, if found.
*
* @param int $messageId
* @return TopicSubscription[]
* @return PendingMessage|null
*/
public function getTopicSubscriptionsWithMessageId(int $messageId): array;
public function getPendingOutgoingMessage(int $messageId): ?PendingMessage;

/**
* Find a topic subscription with the given topic.
* Gets a list of pending outgoing messages last sent before the given date time.
*
* @param string $topic
* @return TopicSubscription|null
*/
public function getTopicSubscriptionByTopic(string $topic): ?TopicSubscription;

/**
* Get all topic subscriptions matching the given topic.
*
* @param string $topic
* @return TopicSubscription[]
*/
public function getTopicSubscriptionsMatchingTopic(string $topic): array;

/**
* Removes the topic subscription with the given topic from the repository.
* Returns true if a topic subscription existed and has been removed.
* Otherwise, false is returned.
* If date time is `null`, all pending messages are returned.
*
* @param string $topic
* @return bool
*/
public function removeTopicSubscription(string $topic): bool;

/**
* Returns the number of pending publish messages.
* The messages are returned in the same order they were added to the repository.
*
* @return int
* @param DateTime|null $dateTime
* @return PendingMessage[]
*/
public function countPendingPublishMessages(): int;
public function getPendingOutgoingMessagesLastSentBefore(DateTime $dateTime = null): array;

/**
* Adds a pending published message to the repository.
* Adds a pending outgoing message to the repository.
*
* @param PublishedMessage $message
* @param PendingMessage $message
* @return void
* @throws PendingMessageAlreadyExistsException
*/
public function addPendingPublishedMessage(PublishedMessage $message): void;
public function addPendingOutgoingMessage(PendingMessage $message): void;

/**
* Gets a pending published message with the given message identifier, if found.
* Marks an existing pending outgoing published message as received in the repository.
*
* @param int $messageId
* @return PublishedMessage|null
*/
public function getPendingPublishedMessageWithMessageId(int $messageId): ?PublishedMessage;

/**
* Gets a list of pending published messages last sent before the given date time.
*
* @param DateTime $dateTime
* @return PublishedMessage[]
*/
public function getPendingPublishedMessagesLastSentBefore(DateTime $dateTime): array;

/**
* Marks the pending published message with the given message identifier as received.
* If the message has no QoS level of 2, is not found or has already been received,
* false is returned. Otherwise the result will be true.
* If the message does not exists, an exception is thrown,
* otherwise `true` is returned if the message was marked as received, and `false`
* in case it was already marked as received.
*
* @param int $messageId
* @return bool
* @throws PendingMessageNotFoundException
*/
public function markPendingPublishedMessageAsReceived(int $messageId): bool;
public function markPendingOutgoingPublishedMessageAsReceived(int $messageId): bool;

/**
* Removes a pending published message from the repository. If a pending message
* with the given identifier is found and successfully removed from the repository,
* `true` is returned. Otherwise `false` will be returned.
* Removes a pending outgoing message from the repository.
*
* If a pending message with the given identifier is found and
* successfully removed from the repository, `true` is returned.
* Otherwise `false` will be returned.
*
* @param int $messageId
* @return bool
*/
public function removePendingPublishedMessage(int $messageId): bool;
public function removePendingOutgoingMessage(int $messageId): bool;

/**
* Returns the number of pending unsubscribe requests.
* Returns the number of pending incoming messages.
*
* @return int
*/
public function countPendingUnsubscribeRequests(): int;
public function countPendingIncomingMessages(): int;

/**
* Adds a pending unsubscribe request to the repository.
*
* @param UnsubscribeRequest $request
* @return void
*/
public function addPendingUnsubscribeRequest(UnsubscribeRequest $request): void;

/**
* Gets a pending unsubscribe request with the given message identifier, if found.
* Gets a pending incoming message with the given message identifier, if found.
*
* @param int $messageId
* @return UnsubscribeRequest|null
* @return PendingMessage|null
*/
public function getPendingUnsubscribeRequestWithMessageId(int $messageId): ?UnsubscribeRequest;
public function getPendingIncomingMessage(int $messageId): ?PendingMessage;

/**
* Gets a list of pending unsubscribe requests last sent before the given date time.
* Adds a pending outgoing message to the repository.
*
* @param DateTime $dateTime
* @return UnsubscribeRequest[]
* @param PendingMessage $message
* @return void
* @throws PendingMessageAlreadyExistsException
*/
public function getPendingUnsubscribeRequestsLastSentBefore(DateTime $dateTime): array;
public function addPendingIncomingMessage(PendingMessage $message): void;

/**
* Removes a pending unsubscribe requests from the repository. If a pending request
* with the given identifier is found and successfully removed from the repository,
* `true` is returned. Otherwise `false` will be returned.
* Removes a pending incoming message from the repository.
*
* If a pending message with the given identifier is found and
* successfully removed from the repository, `true` is returned.
* Otherwise `false` will be returned.
*
* @param int $messageId
* @return bool
*/
public function removePendingUnsubscribeRequest(int $messageId): bool;
public function removePendingIncomingMessage(int $messageId): bool;

/**
* Returns the number of pending publish confirmations.
* Returns the number of registered subscriptions.
*
* @return int
*/
public function countPendingPublishConfirmations(): int;
public function countSubscriptions(): int;

/**
* Adds a pending publish confirmation to the repository.
* Adds a subscription to the repository.
*
* @param PublishedMessage $message
* @param Subscription $subscription
* @return void
* @throws PendingPublishConfirmationAlreadyExistsException
*/
public function addPendingPublishConfirmation(PublishedMessage $message): void;
public function addSubscription(Subscription $subscription): void;

/**
* Gets a pending publish confirmation with the given message identifier, if found.
* Gets all subscriptions matching the given criteria.
*
* @param int $messageId
* @return PublishedMessage|null
* @param string|null $topicName
* @param int|null $subscriptionId
* @return Subscription[]
*/
public function getPendingPublishConfirmationWithMessageId(int $messageId): ?PublishedMessage;
public function getMatchingSubscriptions(string $topicName = null, int $subscriptionId = null): array;

/**
* Removes the pending publish confirmation with the given message identifier
* from the repository. This is normally done as soon as a transaction has been
* successfully finished by the publisher.
* Removes the subscription with the given topic filter from the repository.
*
* @param int $messageId
* Returns `true` if a topic subscription existed and has been removed.
* Otherwise, `false` is returned.
*
* @param string $topicFilter
* @return bool
*/
public function removePendingPublishConfirmation(int $messageId): bool;
public function removeSubscription(string $topicFilter): bool;
}
Loading