-
Notifications
You must be signed in to change notification settings - Fork 21
/
pubConvMedline
executable file
·207 lines (171 loc) · 8.56 KB
/
pubConvMedline
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
#!/usr/bin/env python2
# first load the standard libraries from python
# we require at least python 2.5
#from sys import *
from __future__ import print_function
import sys
if sys.version_info[0]==2 and not sys.version_info[1]>=7:
print("Sorry, this program requires at least python 2.7")
print("You can download a more current python version from python.org and compile it")
print("into your homedir with 'configure --prefix ~/python'; make;")
print("then run this program by specifying your own python executable like this: ")
print(" ~/python/bin/python ~/pubtools/pubtools")
print("or add python/bin to your PATH before /usr/bin, then run pubtools itself")
exit(1)
# load default python packages
import logging, optparse, os, glob, zipfile, types, gzip, shutil
from os.path import *
# add <scriptDir>/lib/ to package search path
progFile = os.path.abspath(sys.argv[0])
progDir = os.path.dirname(progFile)
pubToolsLibDir = os.path.join(progDir, "lib")
sys.path.insert(0, pubToolsLibDir)
# now load our own libraries
import pubGeneric, maxRun, pubStore, pubConf, maxCommon, pubXml, pubPubmed, pubCompare
import unidecode
# === CONSTANTS ===================================
# === COMMAND LINE INTERFACE, OPTIONS AND HELP ===
parser = optparse.OptionParser("""usage: %prog [options] <inDir> <outDir> - convert medline .gz files to pubTools format. Also create fingerprints so data sources without PMIDs can look up their PMIDs quickly.
example:
pubConvMedline /hive/data/outside/pubs/medline/ /hive/data/inside/pubs/text/medline
Each file is a separate job, articleIds are assigned to each file such that each chunk starts at +300k articleIds
after the last chunk, to avoid overlaps
Download medline files from Medline ftp with the tool 'pubGetMedline'.
Or alternatively with something like this:
lftp -e 'set net:socket-buffer 4000000; connect ftp://ftp.nlm.nih.gov/nlmdata/.medleasebaseline/gz; mirror -c --parallel=8 .; quit'
""")
parser.add_option("", "--minId", dest="minId", action="store", help="numerical IDs written to the pubStore start at this number times one billion to prevent overlaps of numerical IDs between publishers, default %default", default=pubConf.identifierStart["medline"])
parser.add_option("", "--idsPerFile", dest="idsPerFile", action="store", help="number of identifiers per medline file. Reserves space for x entries in numerical namespace, default %default", default=300000)
parser.add_option("", "--maxRam", dest="maxRam", type="int", help="number of gigabytes of RAM to request for cluster jobs", default=None)
#parser.add_option("-u", "--updateCrawler", metavar="CRAWLDIR", dest="crawlDir", action="store", help="go over all subdirs of crawlDir, update all pmids.txt with pmids of ISSNs in issns.tab")
parser.add_option("", "--parse", dest="parse", action="store_true", help="for debugging, just parse one single xml file", default=None)
parser.add_option("", "--auto", dest="auto", action="store_true", help="predefine in and out dir based on pubConf.py config file")
parser.add_option("", "--noDb", dest="noDb", action="store_true", help="do not create the sqlite db")
#parser.add_option("-u", "--updateDb", dest="updateDb", action="store_true", help="export new data to sqlite db defined in pubConf")
pubGeneric.addGeneralOptions(parser)
(options, args) = parser.parse_args()
# ==== FUNCTIONs =====
def submitJobs(inDir, outDir, minId, idStep, headNode):
" create one job per input medline file, process only new files "
flagFnames = glob.glob(join(inDir, "newVersion.*"))
assert(len(flagFnames))<=1
if len(flagFnames)!=0:
flagFname = flagFnames[0]
oldVersionId = flagFname.split(".")[-1]
oldDirBak = outDir+"."+oldVersionId
logging.warn("New baseline came in. Removing all old data. Renaming %s to %s" % (outDir, oldDirBak))
shutil.move(outDir, oldDirBak)
logging.warn("Recreating clean %s" % outDir)
os.makedirs(outDir)
newFlag = join(outDir, basename(flagFname))
logging.warn("Dropping flagfile %s" % newFlag)
open(newFlag, "w").write("")
logging.warn("Removing old flagfile %s" % flagFname)
os.remove(flagFname)
# convert into a /build dir first and then move over files if the build was successful
buildDir = pubGeneric.makeBuildDir(outDir)
updateId, firstArticleId, doneFiles = pubStore.parseUpdatesTab(outDir, minArticleId=minId)
medlineFnames = glob.glob(join(inDir, "*.xml.gz"))
if len(medlineFnames)==0:
medlineFnames = glob.glob(join(inDir, "*.xml"))
if len(medlineFnames)==0:
logging.error("No gz or xml files found in %s" % inDir)
os.rmdir(buildDir)
sys.exit(1)
doneFiles = set(doneFiles)
runner = pubGeneric.makeClusterRunner(__file__, maxJob=pubConf.convertMaxJob, headNode=options.cluster, maxRam=options.maxRam, outDir=outDir)
chunkArticleId = firstArticleId
chunkId = 0
newFiles = set()
fCount = 0
for medlineFname in medlineFnames:
if basename(medlineFname) in doneFiles:
logging.debug("file %s marker as done in updates.tab" % medlineFname)
continue
medlineFname = abspath(medlineFname)
fCount += 1
outFname = abspath(os.path.join(buildDir, "%d_%05d.articles.gz" % (updateId, chunkId)))
maxCommon.mustNotExist(outFname)
command = "%s %s {check in exists %s} {check out exists+ %s} %d" % \
(sys.executable, progFile, medlineFname, outFname, chunkArticleId)
logging.debug("submitting command: %s" % command)
runner.submit(command)
chunkArticleId += idStep
chunkId += 1
newFiles.add(basename(medlineFname))
if fCount==0:
logging.info("All files were already converted, nothing done")
os.rmdir(buildDir)
return
runner.finish(wait=True)
pubStore.moveFiles(buildDir, outDir)
pubStore.appendToUpdatesTxt(outDir, updateId, chunkArticleId, newFiles)
shutil.rmtree(buildDir)
return updateId
def convertOneChunk(fileMinId, inFile, outFile):
"""
convert one medlinefile to one pubtools file
"""
store = pubStore.PubWriterFile(outFile)
logging.debug("Reading %s" % inFile)
if inFile.endswith(".gz"):
xmlString = gzip.open(inFile).read()
else:
xmlString = open(inFile).read()
logging.debug("Writing to %s" % outFile)
articleId = int(fileMinId)
# parse & write to output
for articleData in pubPubmed.parsePubmedMedlineIter(xmlString):
logging.debug("Writing article %s" % str(articleId))
articleData["source"]="medline"
articleData["origFile"]=basename(inFile)
articleData["publisher"]="ncbi"
del articleData["mid"] # pubmed does not document what the "mid" identifier could be
store.writeArticle(articleId, articleData)
articleId += 1
store.close()
def debugParser(fname):
" parse one XML file, for debugging "
xmlString = open(fname).read()
xmlString = xmlString.replace("<", "<").replace(">", ">")
xmlString = xmlString.replace("<pre>", "").replace("</pre>", "")
xmlLines = xmlString.split("\n")
xmlLines = [x for x in xmlLines if not x.startswith("<!")]
xmlString = "\n".join(xmlLines)
print(xmlString)
# input file can come from medline or pubmed, try medline
for articleData in pubPubmed.parsePubmedMedlineIter(xmlString, fromMedline=True):
for key, val in articleData.iteritems():
print(key, val.encode("utf8"))
# try pubmed
for articleData in pubPubmed.parsePubmedMedlineIter(xmlString):
for key, val in articleData.iteritems():
print(key, val.encode("utf8"))
sys.exit(0)
def main(args,options):
if options.parse!=None:
debugParser(args[0])
# normal operation
inDir, outDir = pubGeneric.setInOutDirs(options.auto, args, "medline")
maxCommon.mustExist(inDir)
minId = options.minId
maxIdPerFile = options.idsPerFile
pubGeneric.setupLogging(progFile, options)
if os.path.isdir(inDir):
maxCommon.mustExistDir(outDir)
pubGeneric.setLockFile(outDir, "pubConvMedline")
updateId = submitJobs(inDir, outDir, minId, maxIdPerFile, options.cluster)
updateIds = [updateId]
pubCompare.createWriteFingerprints(outDir, updateIds = updateIds)
if not options.noDb:
pubStore.updateSqlite(outDir)
else:
inFile = inDir
outFile = outDir
chunkMinId = args[2]
convertOneChunk(int(chunkMinId), inFile, outFile)
# ----------- MAIN --------------
if args==[] and not options.auto:
parser.print_help()
exit(1)
main(args, options)