Skip to content

Commit

Permalink
Fixed race condition in MsgElement::retrieve.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgates108 committed Dec 13, 2019
1 parent a01379b commit 914341a
Show file tree
Hide file tree
Showing 43 changed files with 871 additions and 151 deletions.
2 changes: 1 addition & 1 deletion admin/tools/docker/index/container/dev/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ FROM qserv/qserv:dev
USER 0

#RUN mv /usr/bin/sh /usr/bin/sh.old && ln -s /usr/bin/bash /usr/bin/sh
RUN yum update --assumeyes && yum install --assumeyes bind-utils
RUN yum update --assumeyes && yum install --assumeyes bind-utils gdb screen

USER 1000

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#! /bin/bash -l
# admin/tools/docker/loader/container/dev/clientNum/appClientNum.bash

_term() {
echo "Caught SIGTERM signal!"
kill -TERM "$child" 2>/dev/null
}

trap _term SIGTERM
trap _term SIGKILL


echo appClientScreen $1 $2 $3

screen -dm /home/qserv/dev/qserv/admin/tools/docker/index/container/dev/clientNum/appClientNum $1 $2 $3

child=$!
wait "$child"
tail -f /dev/null
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ echo "child ${child}"
wait "$child"

sleep 10000
tail -f /dev/null
19 changes: 19 additions & 0 deletions admin/tools/docker/index/container/dev/worker/appWorkerScreen.bash
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#! /bin/bash
# admin/tools/docker/loader/container/dev/worker/appWorker.bash

_term() {
echo "Caught SIGTERM signal!"
kill -TERM "$child" 2>/dev/null
}

trap _term SIGTERM
trap _term SIGKILL

screen -dm /home/qserv/dev/qserv/admin/tools/docker/index/container/dev/worker/appWorker.bash

child=$!
echo "child ${child}"
wait "$child"

sleep 10000
tail -f /dev/null
7 changes: 6 additions & 1 deletion admin/tools/docker/index/index-k8-100m.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ spec:
containers:
- name: imaster-ctr
image: qserv/indexmaster:dev
imagePullPolicy: Always
ports:
- containerPort: 10042
protocol: UDP
Expand Down Expand Up @@ -72,6 +73,7 @@ spec:
containers:
- name: iworker-ctr
image: qserv/indexworker:dev
imagePullPolicy: Always
ports:
- containerPort: 10043
protocol: UDP
Expand Down Expand Up @@ -113,7 +115,8 @@ spec:
containers:
- name: iclientnum-ctr
image: qserv/indexclientnum:dev
args: ["1", "100000000", "client-k8s-a1.cnf"]
imagePullPolicy: Always
args: ["100000000", "1", "client-k8s-a1.cnf"]
ports:
- containerPort: 10050
protocol: UDP
Expand Down Expand Up @@ -153,6 +156,7 @@ spec:
containers:
- name: iclientnum2-ctr
image: qserv/indexclientnum:dev
imagePullPolicy: Always
args: ["200000001", "300000001", "client-k8s-a2.cnf"]
ports:
- containerPort: 10050
Expand Down Expand Up @@ -193,6 +197,7 @@ spec:
containers:
- name: iclientnum3-ctr
image: qserv/indexclientnum:dev
imagePullPolicy: Always
args: ["100000001", "200000000", "client-k8s-a3.cnf"]
ports:
- containerPort: 10050
Expand Down
206 changes: 206 additions & 0 deletions admin/tools/docker/index/index-k8-10m.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
apiVersion: v1
kind: Service
metadata:
name: imaster-svc
labels:
app: index
spec:
ports:
- port: 10042
protocol: UDP
clusterIP: None
selector:
app: imaster-pod
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: imaster-sts
labels:
app: index
spec:
serviceName: imaster-svc
podManagementPolicy: Parallel
replicas: 1
selector:
matchLabels:
app: imaster-pod
template:
metadata:
labels:
app: imaster-pod
spec:
containers:
- name: imaster-ctr
image: qserv/indexmaster:dev
imagePullPolicy: Always
ports:
- containerPort: 10042
protocol: UDP
---
apiVersion: v1
kind: Service
metadata:
name: iworker-svc
labels:
app: index
spec:
ports:
- port: 10043
protocol: UDP
clusterIP: None
selector:
app: iworker-pod
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: iworker-sts
labels:
app: index
spec:
serviceName: iworker-svc
podManagementPolicy: Parallel
replicas: 3
selector:
matchLabels:
app: iworker-pod
template:
metadata:
labels:
app: iworker-pod
spec:
containers:
- name: iworker-ctr
image: qserv/indexworker:dev
imagePullPolicy: Always
ports:
- containerPort: 10043
protocol: UDP
- containerPort: 10143
protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
name: iclientnum-svc
labels:
app: index
spec:
ports:
- port: 10050
protocol: UDP
clusterIP: None
selector:
app: iclientnum-pod
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: iclientnum-sts
labels:
app: index
spec:
serviceName: iclientnum-svc
podManagementPolicy: Parallel
replicas: 1
selector:
matchLabels:
app: iclientnum-pod
template:
metadata:
labels:
app: iclientnum-pod
spec:
containers:
- name: iclientnum-ctr
image: qserv/indexclientnum:dev
imagePullPolicy: Always
args: ["1000000", "1", "client-k8s-a1.cnf"]
ports:
- containerPort: 10050
protocol: UDP
---
apiVersion: v1
kind: Service
metadata:
name: iclientnum2-svc
labels:
app: index
spec:
ports:
- port: 10050
protocol: UDP
clusterIP: None
selector:
app: iclientnum2-pod
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: iclientnum2-sts
labels:
app: index
spec:
serviceName: iclientnum2-svc
podManagementPolicy: Parallel
replicas: 1
selector:
matchLabels:
app: iclientnum2-pod
template:
metadata:
labels:
app: iclientnum2-pod
spec:
containers:
- name: iclientnum2-ctr
image: qserv/indexclientnum:dev
imagePullPolicy: Always
args: ["2000001", "3000001", "client-k8s-a2.cnf"]
ports:
- containerPort: 10050
protocol: UDP
---
apiVersion: v1
kind: Service
metadata:
name: iclientnum3-svc
labels:
app: index
spec:
ports:
- port: 10050
protocol: UDP
clusterIP: None
selector:
app: iclientnum3-pod
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: iclientnum3-sts
labels:
app: index
spec:
serviceName: iclientnum3-svc
podManagementPolicy: Parallel
replicas: 1
selector:
matchLabels:
app: iclientnum3-pod
template:
metadata:
labels:
app: iclientnum3-pod
spec:
containers:
- name: iclientnum3-ctr
image: qserv/indexclientnum:dev
imagePullPolicy: Always
args: ["1000001", "2000000", "client-k8s-a3.cnf"]
ports:
- containerPort: 10050
protocol: UDP


25 changes: 20 additions & 5 deletions core/modules/loader/BufferUdp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ MsgElement::Ptr BufferUdp::readFromSocket(boost::asio::ip::tcp::socket& socket,

// If there's something in the buffer already, get it and return.
// This can happen when the previous read of socket read multiple elements.
MsgElement::Ptr msgElem = _safeRetrieve();
MsgElement::Ptr msgElem = _safeRetrieve("1readFromSocket&&&" + note);
if (msgElem != nullptr) {
return msgElem;
}
Expand All @@ -69,7 +69,7 @@ MsgElement::Ptr BufferUdp::readFromSocket(boost::asio::ip::tcp::socket& socket,

/// Try to retrieve an element (there's no guarantee that an entire element got read in a single read.
// Store original cursor positions so they can be restored if the read fails.
msgElem = _safeRetrieve();
msgElem = _safeRetrieve("2readFromSocket&&&" + note);
if (msgElem != nullptr) {
return msgElem;
}
Expand Down Expand Up @@ -117,10 +117,11 @@ void BufferUdp::advanceReadCursor(size_t len) {
}


std::shared_ptr<MsgElement> BufferUdp::_safeRetrieve() {
std::shared_ptr<MsgElement> BufferUdp::_safeRetrieve(std::string const& note) { // &&& delete note, maybe
auto wCursorOriginal = _wCursor;
auto rCursorOriginal = _rCursor;
MsgElement::Ptr msgElem = MsgElement::retrieve(*this);
// throwOnMissing=false since missing data is possible with TCP.
MsgElement::Ptr msgElem = MsgElement::retrieve(*this, note + " _safeRetrieve &&&", false);
if (msgElem != nullptr) {
return msgElem;
} else {
Expand All @@ -133,7 +134,20 @@ std::shared_ptr<MsgElement> BufferUdp::_safeRetrieve() {

bool BufferUdp::isRetrieveSafe(size_t len) const {
auto newLen = (_rCursor + len);
return (newLen <= _end && newLen <= _wCursor);
// &&&return (newLen <= _end && newLen <= _wCursor);
bool res = (newLen <= _end && newLen <= _wCursor); // &&&
if (!res) { // &&&
LOGS(_log, LOG_LVL_WARN, "&&& BufferUdp::isRetrieveSafe not safe len=" << len <<
" rCursor=" << (void*)_rCursor <<
" newLen=" << (void*)newLen <<
" wCursor=" << (void*)_wCursor <<
" _end=" << (void*)_end <<
" (newLen<=end)=" << (newLen <= _end) <<
" (newLen<=_wCursor)=" << (newLen <= _wCursor) <<
" res=" << res);
LOGS(_log, LOG_LVL_WARN, "&&& BufferUdp::isRetrieveSafe " << dumpStr(false));
}
return res;
}


Expand All @@ -143,6 +157,7 @@ bool BufferUdp::retrieve(void* out, size_t len) {
_rCursor += len;
return true;
}
LOGS(_log, LOG_LVL_WARN, "&&& BufferUdp::retrieve not safe len=" << len);
return false;
}

Expand Down
5 changes: 3 additions & 2 deletions core/modules/loader/BufferUdp.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class MsgElement;


/// A buffer for reading and writing. Nothing can be read from the buffer until
/// something has written to it.
/// something has been written to it.
/// TODO: rename BufferUdp is not really accurate anymore. &&&
class BufferUdp {
public:
using Ptr = std::shared_ptr<BufferUdp>;
Expand Down Expand Up @@ -154,7 +155,7 @@ class BufferUdp {
/// MsgElement is available. If so, return the element and advance _rCursor.
/// Otherwise return nullptr.
/// If a message is not recovered, the buffer is left effectively unchanged.
std::shared_ptr<MsgElement> _safeRetrieve();
std::shared_ptr<MsgElement> _safeRetrieve(std::string const& note);


char* _buffer;
Expand Down
1 change: 1 addition & 0 deletions core/modules/loader/Central.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ void Central::_checkDoList() {
while(_loop) {
// Run and then sleep for a second. TODO A more advanced timer should be used
doList->checkList();
LOGS(_log, LOG_LVL_INFO, "&&& SLEEP");
usleep(_loopSleepTime);
}
}
Expand Down
Loading

0 comments on commit 914341a

Please sign in to comment.