diff --git a/src/Connection.php b/src/Connection.php index 0b80776..87f5527 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -18,10 +18,15 @@ namespace Throttr\SDK; use Co; +use http\Env\Request; use Swoole\Coroutine\Client; use Swoole\Coroutine\Channel; use Throttr\SDK\Enum\RequestType; use Throttr\SDK\Enum\ValueSize; +use Throttr\SDK\Responses\ChannelResponse; +use Throttr\SDK\Responses\ChannelsResponse; +use Throttr\SDK\Responses\ConnectionResponse; +use Throttr\SDK\Responses\ConnectionsResponse; use Throttr\SDK\Responses\GetResponse; use Throttr\SDK\Responses\InfoResponse; use Throttr\SDK\Responses\ListResponse; @@ -29,6 +34,7 @@ use Throttr\SDK\Responses\StatResponse; use Throttr\SDK\Responses\StatsResponse; use Throttr\SDK\Responses\StatusResponse; +use Throttr\SDK\Responses\WhoamiResponse; /** * Connection @@ -182,6 +188,11 @@ private function processResponses(): void RequestType::INFO => InfoResponse::fromBytes($buffer, $this->size), RequestType::STAT => StatResponse::fromBytes($buffer, $this->size), RequestType::STATS => StatsResponse::fromBytes($buffer, $this->size), + RequestType::CONNECTIONS => ConnectionsResponse::fromBytes($buffer, $this->size), + RequestType::CONNECTION => ConnectionResponse::fromBytes($buffer, $this->size), + RequestType::CHANNELS => ChannelsResponse::fromBytes($buffer, $this->size), + RequestType::CHANNEL => ChannelResponse::fromBytes($buffer, $this->size), + RequestType::WHOAMI => WhoamiResponse::fromBytes($buffer, $this->size), }; if ($response === null) { diff --git a/src/Enum/ConnectionKind.php b/src/Enum/ConnectionKind.php new file mode 100644 index 0000000..d5bcfde --- /dev/null +++ b/src/Enum/ConnectionKind.php @@ -0,0 +1,34 @@ +. + +namespace Throttr\SDK\Enum; + +/** + * Connection kind + */ +enum ConnectionKind: int +{ + /** + * TCP socket + */ + case TCP = 0x00; + + /** + * Unix socket + */ + case UNIX = 0x01; +} diff --git a/src/Enum/ConnectionType.php b/src/Enum/ConnectionType.php new file mode 100644 index 0000000..88cd5ca --- /dev/null +++ b/src/Enum/ConnectionType.php @@ -0,0 +1,34 @@ +. + +namespace Throttr\SDK\Enum; + +/** + * Connection type + */ +enum ConnectionType: int +{ + /** + * Client + */ + case CLIENT = 0x00; + + /** + * Agent + */ + case AGENT = 0x01; +} diff --git a/src/Enum/IpVersion.php b/src/Enum/IpVersion.php new file mode 100644 index 0000000..65b5741 --- /dev/null +++ b/src/Enum/IpVersion.php @@ -0,0 +1,34 @@ +. + +namespace Throttr\SDK\Enum; + +/** + * IpVersion + */ +enum IpVersion: int +{ + /** + * V4 + */ + case V4 = 0x04; + + /** + * V6 + */ + case V6 = 0x06; +} diff --git a/src/Requests/ChannelRequest.php b/src/Requests/ChannelRequest.php new file mode 100644 index 0000000..3a28a5a --- /dev/null +++ b/src/Requests/ChannelRequest.php @@ -0,0 +1,58 @@ +. + +namespace Throttr\SDK\Requests; + +use Throttr\SDK\Enum\RequestType; +use Throttr\SDK\Enum\ValueSize; + +/** + * Connection request + */ +class ChannelRequest extends BaseRequest +{ + /** + * Type + * + * @var RequestType + */ + public RequestType $type = RequestType::CHANNEL; + + /** + * Constructor + * + * @param string $name + */ + public function __construct( + public string $name + ) + { + } + + /** + * To bytes + * + * @param ValueSize $size + * @return string + */ + public function toBytes(ValueSize $size): string + { + return pack(static::pack(ValueSize::UINT8), $this->type->value) . + pack(static::pack(ValueSize::UINT8), strlen($this->name)) . + $this->name; + } +} diff --git a/src/Requests/ChannelsRequest.php b/src/Requests/ChannelsRequest.php new file mode 100644 index 0000000..5ad3df1 --- /dev/null +++ b/src/Requests/ChannelsRequest.php @@ -0,0 +1,52 @@ +. + +namespace Throttr\SDK\Requests; + +use Throttr\SDK\Enum\RequestType; +use Throttr\SDK\Enum\ValueSize; + +/** + * Channels request + */ +class ChannelsRequest extends BaseRequest +{ + /** + * Type + * + * @var RequestType + */ + public RequestType $type = RequestType::CHANNELS; + + /** + * Constructor + */ + public function __construct() + { + } + + /** + * To bytes + * + * @param ValueSize $size + * @return string + */ + public function toBytes(ValueSize $size): string + { + return pack(static::pack(ValueSize::UINT8), $this->type->value); + } +} diff --git a/src/Requests/ConnectionRequest.php b/src/Requests/ConnectionRequest.php new file mode 100644 index 0000000..9212772 --- /dev/null +++ b/src/Requests/ConnectionRequest.php @@ -0,0 +1,57 @@ +. + +namespace Throttr\SDK\Requests; + +use Throttr\SDK\Enum\RequestType; +use Throttr\SDK\Enum\ValueSize; + +/** + * Connection request + */ +class ConnectionRequest extends BaseRequest +{ + /** + * Type + * + * @var RequestType + */ + public RequestType $type = RequestType::CONNECTION; + + /** + * Constructor + * + * @param string $id + */ + public function __construct( + public string $id + ) + { + } + + /** + * To bytes + * + * @param ValueSize $size + * @return string + */ + public function toBytes(ValueSize $size): string + { + return pack(static::pack(ValueSize::UINT8), $this->type->value) . + hex2bin($this->id); + } +} diff --git a/src/Requests/ConnectionsRequest.php b/src/Requests/ConnectionsRequest.php new file mode 100644 index 0000000..47ba2ea --- /dev/null +++ b/src/Requests/ConnectionsRequest.php @@ -0,0 +1,52 @@ +. + +namespace Throttr\SDK\Requests; + +use Throttr\SDK\Enum\RequestType; +use Throttr\SDK\Enum\ValueSize; + +/** + * Connections request + */ +class ConnectionsRequest extends BaseRequest +{ + /** + * Type + * + * @var RequestType + */ + public RequestType $type = RequestType::CONNECTIONS; + + /** + * Constructor + */ + public function __construct() + { + } + + /** + * To bytes + * + * @param ValueSize $size + * @return string + */ + public function toBytes(ValueSize $size): string + { + return pack(static::pack(ValueSize::UINT8), $this->type->value); + } +} diff --git a/src/Requests/WhoamiRequest.php b/src/Requests/WhoamiRequest.php new file mode 100644 index 0000000..50abc88 --- /dev/null +++ b/src/Requests/WhoamiRequest.php @@ -0,0 +1,52 @@ +. + +namespace Throttr\SDK\Requests; + +use Throttr\SDK\Enum\RequestType; +use Throttr\SDK\Enum\ValueSize; + +/** + * Whoami request + */ +class WhoamiRequest extends BaseRequest +{ + /** + * Type + * + * @var RequestType + */ + public RequestType $type = RequestType::WHOAMI; + + /** + * Constructor + */ + public function __construct() + { + } + + /** + * To bytes + * + * @param ValueSize $size + * @return string + */ + public function toBytes(ValueSize $size): string + { + return pack(static::pack(ValueSize::UINT8), $this->type->value); + } +} diff --git a/src/Responses/ChannelResponse.php b/src/Responses/ChannelResponse.php new file mode 100644 index 0000000..d450c5c --- /dev/null +++ b/src/Responses/ChannelResponse.php @@ -0,0 +1,105 @@ +. + +namespace Throttr\SDK\Responses; + +use Throttr\SDK\Enum\KeyType; +use Throttr\SDK\Enum\TTLType; +use Throttr\SDK\Enum\ValueSize; +use Throttr\SDK\Requests\BaseRequest; + +/** + * ChannelsResponse + */ +class ChannelResponse extends Response implements IResponse { + /** + * Constructor + * + * @param string $data + * @param bool $status + * @param array $subscribers + */ + public function __construct(public string $data, public bool $status, public array $subscribers) {} + + /** + * From bytes + * + * @param string $data + * @param ValueSize $size + * @return ChannelsResponse|null + */ + public static function fromBytes(string $data, ValueSize $size) : ChannelResponse|null { + $valueSize = $size->value; + $offset = 0; + + // Less than 1 byte? not enough for status. + if (strlen($data) < 1) return null; + + $status = ord($data[$offset]) === 1; + $offset++; + + if ($status) { + // Less than 1 + N bytes? not enough for number of subscribers. + if (strlen($data) < 1 + 8) return null; + + $subscribers = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + if ($subscribers === 0) return new ChannelResponse($data, true, []); + + $subscribers_container = []; + + for ($i = 0; $i < $subscribers; ++$i) { + // Less than offset + 16 bytes? not enough for connection id. + if (strlen($data) < $offset + 16) return null; + + $id = substr($data, $offset, 16); + $offset += 16; + + // Less than offset + 8 bytes? not enough for subscribed at. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + + $subscribed_at = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + // Less than offset + 8 bytes? not enough for read bytes. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + + $read_bytes = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + // Less than offset + 8 bytes? not enough for write bytes. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + + $write_bytes = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + $subscribers_container[] = [ + "id" => bin2hex($id), + "subscribed_at" => $subscribed_at, + "read_bytes" => $read_bytes, + "write_bytes" => $write_bytes, + ]; + } + + return new ChannelResponse($data, true, $subscribers_container); + } + + return new ChannelResponse($data, false, []); + } +} + diff --git a/src/Responses/ChannelsResponse.php b/src/Responses/ChannelsResponse.php new file mode 100644 index 0000000..9e43082 --- /dev/null +++ b/src/Responses/ChannelsResponse.php @@ -0,0 +1,135 @@ +. + +namespace Throttr\SDK\Responses; + +use Throttr\SDK\Enum\KeyType; +use Throttr\SDK\Enum\TTLType; +use Throttr\SDK\Enum\ValueSize; +use Throttr\SDK\Requests\BaseRequest; + +/** + * ChannelsResponse + */ +class ChannelsResponse extends Response implements IResponse { + /** + * Constructor + * + * @param string $data + * @param bool $status + * @param array $channels + */ + public function __construct(public string $data, public bool $status, public array $channels) {} + + /** + * From bytes + * + * @param string $data + * @param ValueSize $size + * @return ChannelsResponse|null + */ + public static function fromBytes(string $data, ValueSize $size) : ChannelsResponse|null { + $valueSize = $size->value; + $offset = 0; + + // Less than 1 byte? not enough for status. + if (strlen($data) < 1) return null; + + $status = ord($data[$offset]) === 1; + $offset++; + + if ($status) { + // Less than 1 + N bytes? not enough for quota. + if (strlen($data) < 1 + 8) return null; + + $fragments = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + if ($fragments === 0) return new ChannelsResponse($data, true, []); + + $channels_container = []; + + for ($i = 0; $i < $fragments; ++$i) { + // Less than offset + 8 bytes? not enough for fragment index. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + + $fragment = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + // Less than offset + 8 bytes? not enough for fragment keys count. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + + $number_of_channels = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + $channels_in_fragment = []; + + // Per key in fragment + for ($e = 0; $e < $number_of_channels; ++$e) { + // Less than offset + 1 byte? not enough for key size. + if (strlen($data) < $offset + ValueSize::UINT8->value) return null; + + $channel_size = unpack(BaseRequest::pack(ValueSize::UINT8), substr($data, $offset, ValueSize::UINT8->value))[1]; + $offset += ValueSize::UINT8->value; + + // Less than offset + 8 bytes? not enough for read bytes. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + + $read_bytes = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + // Less than offset + 8 bytes? not enough for write bytes. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + + $write_bytes = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + // Less than offset + 8 bytes? not enough for subscriptions. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + + $subscriptions = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + $channels_in_fragment[] = [ + "size" => $channel_size, + "read_bytes" => $read_bytes, + "write_bytes" => $write_bytes, + "subscriptions" => $subscriptions, + ]; + } + + $total = array_sum(array_column($channels_in_fragment, 'size')); + + // Less than offset + total channels bytes? not enough for name parsing + if (strlen($data) < $offset + $total) return null; + + for ($e = 0; $e < $number_of_channels; ++$e) { + $channels_in_fragment[$e]["name"] = substr($data, $offset, $channels_in_fragment[$e]["size"]); + $offset += $channels_in_fragment[$e]["size"]; + unset($channels_in_fragment[$e]["size"]); + } + + $channels_container = array_merge($channels_container, $channels_in_fragment); + } + + return new ChannelsResponse($data, true, $channels_container); + } + + return new ChannelsResponse($data, false, []); + } +} + diff --git a/src/Responses/ConnectionResponse.php b/src/Responses/ConnectionResponse.php new file mode 100644 index 0000000..74fef7e --- /dev/null +++ b/src/Responses/ConnectionResponse.php @@ -0,0 +1,187 @@ +. + +namespace Throttr\SDK\Responses; + +use Throttr\SDK\Enum\ConnectionKind; +use Throttr\SDK\Enum\ConnectionType; +use Throttr\SDK\Enum\IpVersion; +use Throttr\SDK\Enum\KeyType; +use Throttr\SDK\Enum\TTLType; +use Throttr\SDK\Enum\ValueSize; +use Throttr\SDK\Requests\BaseRequest; + +/** + * ConnectionResponse + */ +class ConnectionResponse extends Response implements IResponse { + /** + * Constructor + * + * @param string $data + * @param bool $status + * @param array $connection + */ + public function __construct(public string $data, public bool $status, public array $connection) {} + + public static array $types = [ + "INSERT", + "SET", + "QUERY", + "GET", + "UPDATE", + "PURGE", + "LIST", + "INFO", + "STAT", + "STATS", + "PUBLISH", + "SUBSCRIBE", + "UNSUBSCRIBE", + "CONNECTIONS", + "CONNECTION", + "CHANNELS", + "CHANNEL", + "WHOAMI" + ]; + + /** + * From bytes + * + * @param string $data + * @param ValueSize $size + * @return ConnectionResponse|null + */ + public static function fromBytes(string $data, ValueSize $size) : ConnectionResponse|null { + $valueSize = $size->value; + $offset = 0; + + // Less than 1 byte? not enough for status. + if (strlen($data) < 1) return null; + + $status = ord($data[$offset]) === 1; + $offset++; + + if ($status) { + // Less than offset + 16 bytes? not enough for uuid. + if (strlen($data) < $offset + 16) return null; + + $id = substr($data, $offset, 16); + $offset += 16; + + // Less than offset + 1 byte? not enough for type. + if (strlen($data) < $offset + ValueSize::UINT8->value) return null; + + $type = ConnectionType::from(unpack(BaseRequest::pack(ValueSize::UINT8), substr($data, $offset, ValueSize::UINT8->value))[1]); + $offset += ValueSize::UINT8->value; + + // Less than offset + 1 byte? not enough for kind. + if (strlen($data) < $offset + ValueSize::UINT8->value) return null; + + $kind = ConnectionKind::from(unpack(BaseRequest::pack(ValueSize::UINT8), substr($data, $offset, ValueSize::UINT8->value))[1]); + $offset += ValueSize::UINT8->value; + + // Less than offset + 1 byte? not enough for ip version. + if (strlen($data) < $offset + ValueSize::UINT8->value) return null; + + $ip_version = IpVersion::from(unpack(BaseRequest::pack(ValueSize::UINT8), substr($data, $offset, ValueSize::UINT8->value))[1]); + $offset += ValueSize::UINT8->value; + + // Less than offset + 16 bytes? not enough for ip. + if (strlen($data) < $offset + 16) return null; + + $ip = substr($data, $offset, 16); + $offset += 16; + + // Less than offset + 2 bytes? not enough for port. + if (strlen($data) < $offset + ValueSize::UINT16->value) return null; + + $port = unpack(BaseRequest::pack(ValueSize::UINT16), substr($data, $offset, ValueSize::UINT16->value))[1]; + $offset += ValueSize::UINT16->value; + + // Less than offset + 8 bytes? not enough for connected at. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + + $connected_at = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + // Less than offset + 8 bytes? not enough for read bytes. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + + $ready_bytes = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + // Less than offset + 8 bytes? not enough for write bytes. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + + $write_bytes = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + // Less than offset + 8 bytes? not enough for published bytes. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + + $published_bytes = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + // Less than offset + 8 bytes? not enough for received bytes. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + + $received_bytes = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + // Less than offset + 8 bytes? not enough for allocated bytes. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + + $allocated_bytes = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + // Less than offset + 8 bytes? not enough for consumed bytes. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + + $consumed_bytes = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + $requests = []; + foreach (static::$types as $request_type) { + // Less than offset + 8 bytes? not enough for requests metric. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + $requests[$request_type] = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + } + + return new ConnectionResponse($data, true, [ + "id" => bin2hex($id), + "type" => $type, + "kind" => $kind, + "ip_version" => $ip_version, + "ip" => $ip, + "port" => $port, + "connected_at" => $connected_at, + "read_bytes" => $ready_bytes, + "write_bytes" => $write_bytes, + "published_bytes" => $published_bytes, + "received_bytes" => $received_bytes, + "allocated_bytes" => $allocated_bytes, + "consumed_bytes" => $consumed_bytes, + "requests" => $requests, + ]); + } + + return new ConnectionResponse($data, false, []); + } +} + diff --git a/src/Responses/ConnectionsResponse.php b/src/Responses/ConnectionsResponse.php new file mode 100644 index 0000000..2ecaf18 --- /dev/null +++ b/src/Responses/ConnectionsResponse.php @@ -0,0 +1,210 @@ +. + +namespace Throttr\SDK\Responses; + +use Throttr\SDK\Enum\ConnectionKind; +use Throttr\SDK\Enum\ConnectionType; +use Throttr\SDK\Enum\IpVersion; +use Throttr\SDK\Enum\KeyType; +use Throttr\SDK\Enum\TTLType; +use Throttr\SDK\Enum\ValueSize; +use Throttr\SDK\Requests\BaseRequest; + +/** + * ConnectionsResponse + */ +class ConnectionsResponse extends Response implements IResponse { + /** + * Constructor + * + * @param string $data + * @param bool $status + * @param array $connections + */ + public function __construct(public string $data, public bool $status, public array $connections) {} + + public static array $types = [ + "INSERT", + "SET", + "QUERY", + "GET", + "UPDATE", + "PURGE", + "LIST", + "INFO", + "STAT", + "STATS", + "PUBLISH", + "SUBSCRIBE", + "UNSUBSCRIBE", + "CONNECTION", + "CONNECTIONS", + "CHANNELS", + "CHANNEL", + "WHOAMI" + ]; + + /** + * From bytes + * + * @param string $data + * @param ValueSize $size + * @return ListResponse|null + */ + public static function fromBytes(string $data, ValueSize $size) : ConnectionsResponse|null { + $valueSize = $size->value; + $offset = 0; + + // Less than 1 byte? not enough for status. + if (strlen($data) < 1) return null; + + $status = ord($data[$offset]) === 1; + $offset++; + + if ($status) { + // Less than 1 + N bytes? not enough for quota. + if (strlen($data) < 1 + 8) return null; + + $fragments = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + if ($fragments === 0) return new ConnectionsResponse($data, true, []); + + $connections = []; + + for ($i = 0; $i < $fragments; ++$i) { + // Less than offset + 8 bytes? not enough for fragment index. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + + $fragment = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + // Less than offset + 8 bytes? not enough for fragment keys count. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + + $number_of_connections = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + // Per connection in fragment + for ($e = 0; $e < $number_of_connections; ++$e) { + + // Less than offset + 16 bytes? not enough for uuid. + if (strlen($data) < $offset + 16) return null; + + $id = substr($data, $offset, 16); + $offset += 16; + + // Less than offset + 1 byte? not enough for type. + if (strlen($data) < $offset + ValueSize::UINT8->value) return null; + + $type = ConnectionType::from(unpack(BaseRequest::pack(ValueSize::UINT8), substr($data, $offset, ValueSize::UINT8->value))[1]); + $offset += ValueSize::UINT8->value; + + // Less than offset + 1 byte? not enough for kind. + if (strlen($data) < $offset + ValueSize::UINT8->value) return null; + + $kind = ConnectionKind::from(unpack(BaseRequest::pack(ValueSize::UINT8), substr($data, $offset, ValueSize::UINT8->value))[1]); + $offset += ValueSize::UINT8->value; + + // Less than offset + 1 byte? not enough for ip version. + if (strlen($data) < $offset + ValueSize::UINT8->value) return null; + + $ip_version = IpVersion::from(unpack(BaseRequest::pack(ValueSize::UINT8), substr($data, $offset, ValueSize::UINT8->value))[1]); + $offset += ValueSize::UINT8->value; + + // Less than offset + 16 bytes? not enough for ip. + if (strlen($data) < $offset + 16) return null; + + $ip = substr($data, $offset, 16); + $offset += 16; + + // Less than offset + 2 bytes? not enough for port. + if (strlen($data) < $offset + ValueSize::UINT16->value) return null; + + $port = unpack(BaseRequest::pack(ValueSize::UINT16), substr($data, $offset, ValueSize::UINT16->value))[1]; + $offset += ValueSize::UINT16->value; + + // Less than offset + 8 bytes? not enough for connected at. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + + $connected_at = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + // Less than offset + 8 bytes? not enough for read bytes. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + + $ready_bytes = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + // Less than offset + 8 bytes? not enough for write bytes. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + + $write_bytes = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + // Less than offset + 8 bytes? not enough for published bytes. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + + $published_bytes = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + // Less than offset + 8 bytes? not enough for received bytes. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + + $received_bytes = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + // Less than offset + 8 bytes? not enough for allocated bytes. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + + $allocated_bytes = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + + $requests = []; + foreach (static::$types as $request_type) { + // Less than offset + 8 bytes? not enough for requests metric. + if (strlen($data) < $offset + ValueSize::UINT64->value) return null; + $requests[$request_type] = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1]; + $offset += ValueSize::UINT64->value; + } + + $connections[] = [ + "id" => bin2hex($id), + "type" => $type, + "kind" => $kind, + "ip_version" => $ip_version, + "ip" => $ip, + "port" => $port, + "connected_at" => $connected_at, + "read_bytes" => $ready_bytes, + "write_bytes" => $write_bytes, + "published_bytes" => $published_bytes, + "received_bytes" => $received_bytes, + "allocated_bytes" => $allocated_bytes, + "requests" => $requests, + ]; + } + } + + return new ConnectionsResponse($data, true, $connections); + } + + return new ConnectionsResponse($data, false, []); + } +} + diff --git a/src/Responses/StatResponse.php b/src/Responses/StatResponse.php index 12df19e..4af332a 100644 --- a/src/Responses/StatResponse.php +++ b/src/Responses/StatResponse.php @@ -44,8 +44,6 @@ public static function fromBytes(string $data, ValueSize $size) : StatResponse|n $valueSize = $size->value; $offset = 0; - echo bin2hex($data) . "\n"; - // Less than 1 byte? not enough for status. if (strlen($data) < 1) return null; diff --git a/src/Responses/WhoamiResponse.php b/src/Responses/WhoamiResponse.php new file mode 100644 index 0000000..effbe09 --- /dev/null +++ b/src/Responses/WhoamiResponse.php @@ -0,0 +1,57 @@ +. + +namespace Throttr\SDK\Responses; + +use Throttr\SDK\Enum\ValueSize; + +/** + * WhoamiResponse + */ +class WhoamiResponse extends Response implements IResponse { + + /** + * Constructor + * + * @param string $data + * @param bool $status + * @param string $id + */ + public function __construct(public string $data, public bool $status, public string $id) { + } + + /** + * From bytes + * + * @param string $data + * @param ValueSize $size + * @return WhoamiResponse|null + */ + public static function fromBytes(string $data, ValueSize $size) : WhoamiResponse|null { + $offset = 0; + + // Less than 1 byte? not enough for status. + if (strlen($data) < 17) return null; + + $status = ord($data[$offset]) === 1; + + $id = substr($data, 1); + + return new WhoamiResponse($data, $status, bin2hex($id)); + } +} + diff --git a/src/Service.php b/src/Service.php index 59ca9c4..7ecb4eb 100644 --- a/src/Service.php +++ b/src/Service.php @@ -24,6 +24,10 @@ use Throttr\SDK\Enum\ValueSize; use Throttr\SDK\Exceptions\ServiceException; use Throttr\SDK\Requests\BaseRequest; +use Throttr\SDK\Requests\ChannelRequest; +use Throttr\SDK\Requests\ChannelsRequest; +use Throttr\SDK\Requests\ConnectionRequest; +use Throttr\SDK\Requests\ConnectionsRequest; use Throttr\SDK\Requests\GetRequest; use Throttr\SDK\Requests\InfoRequest; use Throttr\SDK\Requests\InsertRequest; @@ -34,6 +38,7 @@ use Throttr\SDK\Requests\StatRequest; use Throttr\SDK\Requests\StatsRequest; use Throttr\SDK\Requests\UpdateRequest; +use Throttr\SDK\Requests\WhoamiRequest; use Throttr\SDK\Responses\GetResponse; use Throttr\SDK\Responses\InfoResponse; use Throttr\SDK\Responses\IResponse; @@ -258,6 +263,31 @@ public function get(string $key): GetResponse return $this->send([$request])[0]; } + public function connections() { + $request = new ConnectionsRequest(); + return $this->send($request)[0]; + } + + public function connection(string $id) { + $request = new ConnectionRequest($id); + return $this->send($request)[0]; + } + + public function whoami() { + $request = new WhoamiRequest(); + return $this->send($request)[0]; + } + + public function channels() { + $request = new ChannelsRequest(); + return $this->send($request)[0]; + } + + public function channel(string $name) { + $request = new ChannelRequest($name); + return $this->send($request)[0]; + } + /** * Send * diff --git a/tests/ServiceTest.php b/tests/ServiceTest.php index 261acaf..2db0267 100644 --- a/tests/ServiceTest.php +++ b/tests/ServiceTest.php @@ -428,4 +428,50 @@ public function testStat() { }); } + public function testWhoami() { + $this->prepares(function (Service $service) { + $whoami = $service->whoami(); + $this->assertTrue($whoami->status); + $this->assertTrue(strlen($whoami->id) == 32); + }); + } + + public function testConnections() { + $this->prepares(function (Service $service) { + $connections = $service->connections(); + $this->assertTrue($connections->status); + $this->assertIsArray($connections->connections); + $this->assertCount(1, $connections->connections); + }); + } + + public function testConnection() { + $this->prepares(function (Service $service) { + $whoami = $service->whoami(); + + $connection = $service->connection($whoami->id); + + $this->assertTrue($connection->status); + $this->assertIsArray($connection->connection); + }); + } + + public function testChannels() { + $this->prepares(function (Service $service) { + $channels = $service->channels(); + $this->assertTrue($channels->status); + $this->assertIsArray($channels->channels); + $this->assertCount(2, $channels->channels); + }); + } + + + public function testChannel() { + $this->prepares(function (Service $service) { + $channel = $service->channel("*"); + $this->assertTrue($channel->status); + $this->assertIsArray($channel->subscribers); + $this->assertCount(1, $channel->subscribers); + }); + } }