Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: added pdo proxy #30

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/ConfigProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
namespace Hyperf\Seata;

use Hyperf\Database\Connectors\Connector;
use Hyperf\Seata\Annotation\GlobalTransactionScanner;
use Hyperf\Seata\Core\Model\ResourceManagerInterface;
use Hyperf\Seata\Listener\InitListener;
Expand All @@ -40,7 +41,7 @@ public function __invoke(): array
ResourceManagerInterface::class => DefaultResourceManager::class,
// GlobalTransactionScanner::class => GlobalTransactionScannerFactory::class,
LoggerInterface::class => StdoutLogger::class,
// DataSourceProxy::class => DataSourceProxyFactory::class,
// DataSourceProxy::class => DataSourceProxyFactory::class,
],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'publish' => [ [ 'id' => 'config', 'description' => 'The config of message client.', 'source' => __DIR__ . '/../publish/seata.php', 'destination' => BASE_PATH . '/config/autoload/seata.php', ], ],这里要不要把publish 完善一下?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hyperf,现在你在 ConfigProvider 加上 class_map 也没用,ConfigProvider 里面我没记错他是没支持 class_map 的

];
}
Expand Down
2 changes: 1 addition & 1 deletion src/Core/Context/GlobalLockConfigHolder.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
namespace Hyperf\Seata\Core\Context;

use Hyperf\Seata\Core\Model\GlobalLockConfig;
use Hyperf\Utils\Context;
use Hyperf\Context\Context;

class GlobalLockConfigHolder
{
Expand Down
2 changes: 1 addition & 1 deletion src/Core/Context/RootContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
use Hyperf\Seata\Logger\LoggerFactory;
use Hyperf\Seata\Logger\LoggerInterface;
use Hyperf\Utils\ApplicationContext;
use Hyperf\Utils\Context;
use Hyperf\Context\Context;
use RuntimeException;

class RootContext extends Context
Expand Down
13 changes: 9 additions & 4 deletions src/Core/Rpc/Runtime/Swow/SocketChannel.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
use Hyperf\Seata\Core\Rpc\Runtime\SocketChannelInterface;
use Hyperf\Seata\Core\Rpc\Runtime\V1\ProtocolV1Decoder;
use Hyperf\Seata\Core\Rpc\Runtime\V1\ProtocolV1Encoder;
use Hyperf\Seata\Logger\LoggerFactory;
use Hyperf\Seata\Logger\LoggerInterface;
use Hyperf\Seata\Utils\Buffer\ByteBuffer;
use Hyperf\Seata\Utils\Protocol\RpcMessageUtils;
use Hyperf\Utils\ApplicationContext;
Expand All @@ -49,13 +51,16 @@ class SocketChannel implements SocketChannelInterface

protected Channel $sendChannel;

private LoggerInterface $logger;

public function __construct(Socket $socket, Address $address)
{
$this->socket = $socket;
$this->address = $address;
$container = ApplicationContext::getContainer();
$this->protocolEncoder = $container->get(ProtocolV1Encoder::class);
$this->protocolDecoder = $container->get(ProtocolV1Decoder::class);
$this->logger = $container->get(LoggerFactory::class)->create(static::class);
$this->sendChannel = new Channel();
$this->createRecvLoop();
// $this->createSendLoop();
Expand All @@ -64,7 +69,7 @@ public function __construct(Socket $socket, Address $address)
public function sendSyncWithResponse(RpcMessage $rpcMessage, int $timeoutMillis)
{
$channel = new Channel();
echo 'Ready to send the rpc message #' . RpcMessageUtils::toLogString($rpcMessage) . PHP_EOL;
$this->logger->debug('Ready to send the rpc message #' . RpcMessageUtils::toLogString($rpcMessage));
$this->responses[$rpcMessage->getId()] = $channel;
$this->sendSyncWithoutResponse($rpcMessage, $timeoutMillis);
return $channel->pop();
Expand Down Expand Up @@ -96,7 +101,7 @@ protected function createRecvLoop()
$rpcMessage = $this->protocolDecoder->decode($byteBuffer);
$processorManger->dispatch($this, $rpcMessage);

echo 'Recieved a rpc message #' . RpcMessageUtils::toLogString($rpcMessage) . PHP_EOL;
$this->logger->debug('Recieved a rpc message #' . RpcMessageUtils::toLogString($rpcMessage));
if (isset($this->responses[$rpcMessage->getId()])) {
$responseChannel = $this->responses[$rpcMessage->getId()];
$responseChannel->push($rpcMessage);
Expand All @@ -107,10 +112,10 @@ protected function createRecvLoop()
// // var_dump('heartbeat', $rpcMessage);
// }
} catch (\InvalidArgumentException $exception) {
echo 'Recieved a rpc message fail error:' . $exception->getMessage() . PHP_EOL;
$this->logger->debug('Recieved a rpc message fail error:' . $exception->getMessage());
break;
} catch (\Throwable $exception) {
echo 'Recieved a rpc message fail error:' . $exception->getMessage() . PHP_EOL;
$this->logger->debug( 'Recieved a rpc message fail error:' . $exception->getMessage());
break;
} finally {
isset($rpcMessage) && $this->responses[$rpcMessage->getId()]->close();
Expand Down
10 changes: 1 addition & 9 deletions src/Exception/ExecutionException.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,10 @@ public function setTransaction(GlobalTransaction $transaction): void
$this->transaction = $transaction;
}

/**
* @return int
*/
public function getCode()
{
return $this->code;
}

/**
* @param int $code
*/
public function setCode($code): void
public function setCode(int $code): void
{
$this->code = $code;
}
Expand Down
148 changes: 148 additions & 0 deletions src/Rm/DataSource/ClassMap/Connector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
<?php

declare(strict_types=1);
/**
* Copyright 2019-2022 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
namespace Hyperf\Database\Connectors;

use Exception;
use Hyperf\Database\DetectsLostConnections;
use Hyperf\Seata\Rm\PDOProxy;
use PDO;
use Throwable;

class Connector
{
use DetectsLostConnections;

/**
* The default PDO connection options.
*
* @var array
*/
protected $options = [
PDO::ATTR_CASE => PDO::CASE_NATURAL,
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_ORACLE_NULLS => PDO::NULL_NATURAL,
PDO::ATTR_STRINGIFY_FETCHES => false,
PDO::ATTR_EMULATE_PREPARES => false,
];

/**
* Create a new PDO connection.
*
* @param string $dsn
* @throws \Exception
* @return \PDO
*/
public function createConnection($dsn, array $config, array $options)
{
[$username, $password] = [
$config['username'] ?? null, $config['password'] ?? null,
];

try {
return $this->createPdoConnection(
$dsn,
$username,
$password,
$options
);
} catch (Exception $e) {
return $this->tryAgainIfCausedByLostConnection(
$e,
$dsn,
$username,
$password,
$options
);
}
}

/**
* Get the PDO options based on the configuration.
*
* @return array
*/
public function getOptions(array $config)
{
return array_replace($this->options, $config['options'] ?? []);
}

/**
* Get the default PDO connection options.
*
* @return array
*/
public function getDefaultOptions()
{
return $this->options;
}

/**
* Set the default PDO connection options.
*/
public function setDefaultOptions(array $options)
{
$this->options = $options;
}

/**
* Create a new PDO connection instance.
*
* @param string $dsn
* @param string $username
* @param string $password
* @param array $options
* @return \PDO
*/
protected function createPdoConnection($dsn, $username, $password, $options)
{
return new PDOProxy($dsn, $username, $password, $options);
}

/**
* Determine if the connection is persistent.
*
* @param array $options
* @return bool
*/
protected function isPersistentConnection($options)
{
return isset($options[PDO::ATTR_PERSISTENT])
&& $options[PDO::ATTR_PERSISTENT];
}

/**
* Handle an exception that occurred during connect execution.
*
* @param string $dsn
* @param string $username
* @param string $password
* @param array $options
* @throws \Exception
* @return \PDO
*/
protected function tryAgainIfCausedByLostConnection(Throwable $e, $dsn, $username, $password, $options)
{
if ($this->causedByLostConnection($e)) {
return $this->createPdoConnection($dsn, $username, $password, $options);
}

throw $e;
}
}
2 changes: 1 addition & 1 deletion src/Rm/DataSource/ConnectionContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

use Hyperf\Seata\Exception\ShouldNeverHappenException;
use Hyperf\Seata\Rm\DataSource\Undo\SQLUndoLog;
use Hyperf\Utils\Context;
use Hyperf\Context\Context;

class ConnectionContext
{
Expand Down
15 changes: 14 additions & 1 deletion src/Rm/PDOStatementProxy.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ public function __call(string $name, array $arguments)
return $this->__object->{$name}(...$arguments);
}

public function fetchAll($mode = PDO::FETCH_BOTH, $fetch_argument = null, ...$args)
{
$args = func_get_args();
return $this->__object->fetchAll(...$args);
}

public function setFetchMode($mode, $className = null, ...$params)
{
$args = func_get_args();
return $this->__object->setFetchMode(...$args);
}

public function bindParam(int|string $param, mixed &$var, int $type = PDO::PARAM_INT, int $maxLength = null, mixed $driverOptions = null)
{
$this->bindParamContext[$param] = [$var, $type, $maxLength, $driverOptions];
Expand All @@ -70,6 +82,7 @@ public function bindValue(int|string $param, mixed $value, int $type = PDO::PARA

public function execute(?array $params = null)
{

if ($this->sqlParser->isDelete()) {
$deleteExecutor = new DeleteExecutor($this->sqlParser, $this->PDOProxy, $this->bindParamContext, $this->bindColumnContext, $this->bindValueContext);
$deleteExecutor->execute($params);
Expand All @@ -80,6 +93,6 @@ public function execute(?array $params = null)

if ($this->sqlParser->isInsert()) {
}
return parent::execute($params);
return $this->__object->execute($params);
}
}
2 changes: 1 addition & 1 deletion src/Tm/Api/Transaction/TransactionHookManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
*/
namespace Hyperf\Seata\Tm\Api\Transaction;

use Hyperf\Utils\Context;
use Hyperf\Context\Context;

class TransactionHookManager
{
Expand Down
28 changes: 7 additions & 21 deletions src/Tm/Api/Transaction/TransactionInfo.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,21 @@

class TransactionInfo
{
/**
* @var int
*/
private $timeOut;
private int $timeOut;

/**
* @var string
*/
private $name;
private string $name;

/**
* @var RollbackRule[]
*/
private $rollbackRules;
private array $rollbackRules = [];

/**
* @var Propagation
*/
private $propagation;
private ?Propagation $propagation = null;

/**
* @var int
*/
private $lockRetryInterval;
private int $lockRetryInterval;

private int $lockRetryTimes;

/**
* @var int
*/
private $lockRetryTimes;

public function getTimeOut(): int
{
Expand Down
9 changes: 5 additions & 4 deletions src/Tm/Api/TransactionalTemplate.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,21 @@
use Hyperf\Seata\Exception\ExecutionException;
use Hyperf\Seata\Exception\ShouldNeverHappenException;
use Hyperf\Seata\Exception\TransactionException;
use Hyperf\Seata\Logger\LoggerFactory;
use Hyperf\Seata\Tm\Api\Transaction\Propagation;
use Hyperf\Seata\Tm\Api\Transaction\TransactionHook;
use Hyperf\Seata\Tm\Api\Transaction\TransactionHookManager;
use Hyperf\Seata\Tm\Api\Transaction\TransactionInfo;
use Psr\Log\LoggerInterface;
use Throwable;

class TransactionalTemplate
{
private StdoutLoggerInterface $logger;
private LoggerInterface $logger;

public function __construct(StdoutLoggerInterface $logger)
public function __construct(LoggerFactory $loggerFactory)
{
$this->logger = $logger;
$this->logger = $loggerFactory->create(static::class);
}

public function execute(TransactionalExecutor $business)
Expand Down Expand Up @@ -113,7 +115,6 @@ public function execute(TransactionalExecutor $business)
// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
// else do nothing. Of course, the hooks will still be triggered.
$this->beginTransaction($txInfo, $tx);

$rs = null;
try {
// Do Your Business
Expand Down