Skip to content

Commit

Permalink
Merge pull request #1372 from ZzzCrazyPig/1.6
Browse files Browse the repository at this point in the history
fix #1370
  • Loading branch information
mycatmerger authored Feb 19, 2017
2 parents f2c5b17 + 79c808e commit de2d150
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
*/
package io.mycat.backend.mysql.nio.handler;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -32,13 +33,13 @@

import io.mycat.MycatServer;
import io.mycat.backend.BackendConnection;
import io.mycat.backend.ConnectionMeta;
import io.mycat.backend.datasource.PhysicalDBNode;
import io.mycat.cache.CachePool;
import io.mycat.config.MycatConfig;
import io.mycat.net.mysql.ErrorPacket;
import io.mycat.net.mysql.RowDataPacket;
import io.mycat.route.RouteResultsetNode;
import io.mycat.server.ServerConnection;
import io.mycat.server.parser.ServerParse;

/**
Expand All @@ -56,6 +57,67 @@ public class FetchStoreNodeOfChildTableHandler implements ResponseHandler {
private volatile String dataNode;
private AtomicInteger finished = new AtomicInteger(0);
protected final ReentrantLock lock = new ReentrantLock();

public String execute(String schema, String sql, List<String> dataNodes, ServerConnection sc) {

String key = schema + ":" + sql;
CachePool cache = MycatServer.getInstance().getCacheService()
.getCachePool("ER_SQL2PARENTID");
String result = (String) cache.get(key);
if (result != null) {
return result;
}
this.sql = sql;
int totalCount = dataNodes.size();
long startTime = System.currentTimeMillis();
long endTime = startTime + 5 * 60 * 1000L;
MycatConfig conf = MycatServer.getInstance().getConfig();

LOGGER.debug("find child node with sql:" + sql);
for (String dn : dataNodes) {
if (dataNode != null) {
return dataNode;
}
PhysicalDBNode mysqlDN = conf.getDataNodes().get(dn);
try {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("execute in datanode " + dn);
}
RouteResultsetNode node = new RouteResultsetNode(dn, ServerParse.SELECT, sql);
node.setRunOnSlave(false); // 获取 子表节点,最好走master为好

/*
* fix #1370 默认应该先从已经持有的连接中取连接, 否则可能因为事务隔离性看不到当前事务内更新的数据
* Tips: 通过mysqlDN.getConnection获取到的连接不是当前连接
*
*/
BackendConnection conn = sc.getSession2().getTarget(node);
if(sc.getSession2().tryExistsCon(conn, node)) {
_execute(conn, node, sc);
} else {
mysqlDN.getConnection(mysqlDN.getDatabase(), sc.isAutocommit(), node, this, node);
}
} catch (Exception e) {
LOGGER.warn("get connection err " + e);
}
}

while (dataNode == null && System.currentTimeMillis() < endTime) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
break;
}
if (dataNode != null || finished.get() >= totalCount) {
break;
}
}
if (dataNode != null) {
cache.putIfAbsent(key, dataNode);
}
return dataNode;

}

public String execute(String schema, String sql, ArrayList<String> dataNodes) {
String key = schema + ":" + sql;
Expand Down Expand Up @@ -84,7 +146,7 @@ public String execute(String schema, String sql, ArrayList<String> dataNodes) {
RouteResultsetNode node = new RouteResultsetNode(dn, ServerParse.SELECT, sql);
node.setRunOnSlave(false); // 获取 子表节点,最好走master为好

mysqlDN.getConnection(mysqlDN.getDatabase(), true, node, this, dn);
mysqlDN.getConnection(mysqlDN.getDatabase(), true, node, this, node);

// mysqlDN.getConnection(mysqlDN.getDatabase(), true,
// new RouteResultsetNode(dn, ServerParse.SELECT, sql),
Expand Down Expand Up @@ -115,6 +177,15 @@ public String execute(String schema, String sql, ArrayList<String> dataNodes) {
return dataNode;

}

private void _execute(BackendConnection conn, RouteResultsetNode node, ServerConnection sc) {
conn.setResponseHandler(this);
try {
conn.execute(node, sc, sc.isAutocommit());
} catch (IOException e) {
connectionError(e, conn);
}
}

@Override
public void connectionAcquired(BackendConnection conn) {
Expand Down Expand Up @@ -162,7 +233,7 @@ public void rowResponse(byte[] row, BackendConnection conn) {
}
if (result == null) {
result = getColumn(row);
dataNode = (String) conn.getAttachment();
dataNode = ((RouteResultsetNode) conn.getAttachment()).getName();
} else {
LOGGER.warn("find multi data nodes for child table store, sql is: "
+ sql);
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/io/mycat/route/util/RouterUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -1592,7 +1592,8 @@ public static boolean processERChildTable(final SchemaConfig schema, final Strin
@Override
public String call() throws Exception {
FetchStoreNodeOfChildTableHandler fetchHandler = new FetchStoreNodeOfChildTableHandler();
return fetchHandler.execute(schema.getName(), findRootTBSql, tc.getRootParent().getDataNodes());
// return fetchHandler.execute(schema.getName(), findRootTBSql, tc.getRootParent().getDataNodes());
return fetchHandler.execute(schema.getName(), findRootTBSql, tc.getRootParent().getDataNodes(), sc);
}
});

Expand All @@ -1605,6 +1606,9 @@ public void onSuccess(String result) {
StringBuilder s = new StringBuilder();
LOGGER.warn(s.append(sc.getSession2()).append(origSQL).toString() +
" err:" + "can't find (root) parent sharding node for sql:" + origSQL);
if(!sc.isAutocommit()) { // 处于事务下失败, 必须回滚
sc.setTxInterrupt("can't find (root) parent sharding node for sql:" + origSQL);
}
sc.writeErrMessage(ErrorCode.ER_PARSE_ERROR, "can't find (root) parent sharding node for sql:" + origSQL);
return;
}
Expand Down

0 comments on commit de2d150

Please sign in to comment.