Skip to content

Commit

Permalink
add support for user-defined status lines and charts
Browse files Browse the repository at this point in the history
  • Loading branch information
ben-spiller committed Jun 12, 2020
1 parent 48dea27 commit 7c3cd5a
Show file tree
Hide file tree
Showing 8 changed files with 555 additions and 55 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
3.5
---
- Added the ability to extract data from user-defined periodic status lines and to add user-defined charts, with the new configuration settings ``userStatusLines`` and ``userCharts``.
- Fix incorrect time in chart legend when date is during Daylight Savings Time (relative to the locale the web browser is running in). Now the chart legend is corrected to be consistent with the time shown in the x axis.

3.4
Expand Down
89 changes: 89 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,95 @@ For information about the meaning of the status lines which may be helpful when

Note that the ``overview.html`` page uses the http://dygraphs.com JavaScript library to display the charts, and these are downloaded from the internet when the page is opened, so you will need an internet connection to open the ``overview.html`` page correctly (though you don't need one to run the analyzer, so ).

User status lines
-----------------
In addition to the standard "Correlator status:" lines, the tool can perform similar extraction for any user-defined
status lines. For example this could be used for the correlator's persistence or JMS status lines, or for your own
lines logged regularly at INFO level by your own EPL to show some application KPIs.

To do this create a .json configuration file containing a "userStatusLines" dicionary and pass it to the tool using
``--config``. For example::

{
"userStatusLines":{
// This detects INFO level lines beginning with "JMS Status:"
"JMS Status:": {
// This prefix is added to the start of each alias to avoid clashes with other status KPIs
"keyPrefix":"jms.",
"key:alias":{
"s":"s=senders",
"r":"r=receivers",
"rRate":"rx /sec",
"sRate":"tx /sec",
"rWindow":"receive window",
"rRedel":"redelivered",
"rMaxDeliverySecs":"",
"rDupsDet":"",
"rDupIds":"",
"connErr":"",
"jvmMB":""
}},

"Persistence Status:": {
"keyPrefix":"p.",
"key:alias":{
"numSnapshots":"",
"lastSnapshotTime":"",
"snapshotWaitTimeEwmaMillis":"",
"commitTimeEwmaMillis":"",
"lastSnapshotRowsChangedEwma":""
}}
}
}


Any user-defined status lines should be of the same form as the Correlator status lines, logged at INFO level similar
to::

Some prefix: key1=value1 key2=value2 key3="value 3"

Technical detail: the frequency and timing of other status lines may not match when the main "Correlator status:" lines
are logged. The analyzer just uses the main status lines for the timing, adding the most recently seen user status
values and recording them in a single row with timing and line information from the main status lines.

User-defined charts
-------------------
In addition to the standard charts, you can add charts with an mix of user-defined and standard status values.
This is achieved using the JSON configuration file described above with a "userCharts" entry. For example::

{
"userStatusLines":{
// ...
},
"userCharts": {

// Each chart is described by "uniqueid": { "heading": "title", "labels": [keys], other options... }
"jms_rates":{
"heading":"JMS rates",
"labels":["jms.rx /sec", "jms.tx /sec"],
"colors":["red", "pink", "orange"],
"ylabel":"Events/sec",

// For big numbers this often looks better than exponential notation
"labelsKMB":true
},
// Colors are decided automatically by default, but can be overridden
// This example shows how to put some series onto a y axis
"persistence":{
"heading":"Correlator persistence",
"labels":["p.numSnapshots", "p.snapshotWaitTimeEwmaMillis", "p.commitTimeEwmaMillis"],
"colors":["red", "green", "blue"],

"ylabel":"Time (ms)",
"y2label":"Number of snapshots",
"series": {"p.numSnapshots":{"axis":"y2"}}
}
}

}

Cumulocity
----------
If you're using Apama inside Cumulocity, to download the log use the App Switcher icon to go to **Administration**, then **Applications > Subscribed applications > Apama-ctrl-XXX**. Assuming Apama-ctrl is running, you'll see a **Logs** tab. You should try to get the full log - to do that click the ``|<<`` button to find out the date of the first entry then click **Download**, and select the time range from the start date to the day after today.
Expand Down
129 changes: 74 additions & 55 deletions apamax/log_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,13 @@ def formatItem(self, item, columnDisplayName, missingItemValue='?'):
"""
try:
if item is None: return missingItemValue
if columnDisplayName == 'local datetime':
if columnDisplayName.endswith('local datetime'):
return item[:item.find('.')] # strip off seconds as excel misformats it if present
if item in [True,False]: return str(item).upper()
if isinstance(item, float) and item.is_integer and abs(item)>=1000.0:
item = int(item) # don't show decimal points for large floats like 7000.0, for consistency with smaller values like 7 when shown in excel (weird excel rules)
if isinstance(item, int):
if columnDisplayName=='epoch secs':
if columnDisplayName.endswith('epoch secs'):
return f'{item}'
return f'{item:,}'
if isinstance(item, float):
Expand Down Expand Up @@ -586,6 +586,7 @@ def handleFileStarted(self, file, **extra):
# for handleRawStatusDict
self.columns = None # ordered dict of key:annotated_displayname
self.previousRawStatus = None # the previous raw status
self.userStatus = {}
file['errorsCount'] = file['warningsCount'] = 0

# for handleAnnotatedStatusDict summarization
Expand All @@ -608,8 +609,13 @@ def handleLine(self, file, line, previousLine, **extra):
if m.startswith(('Correlator Status: ', 'Status: sm')): # "Status: " is for very old versions e.g. 4.3
self.handleRawStatusLine(file=file, line=line)
return

level = line.level

for userStatusPrefix, userStatus in self.args.userStatusLines.items():
if m.startswith(userStatusPrefix):
self.handleRawStatusLine(file=file, line=line, userStatus=userStatus)
break

if level == 'W':
if m.startswith('Receiver '):
self.handleConnectionMessage(file, line)
Expand Down Expand Up @@ -646,7 +652,10 @@ def handleLine(self, file, line, previousLine, **extra):
)):
self.handleConnectionMessage(file, line)

def handleRawStatusLine(self, file, line, **extra):
def handleRawStatusLine(self, file, line, userStatus=None, **extra):
"""
Handles a raw status line which may be a correlator status line or a user-defined one
"""
m = line.message
d = collections.OrderedDict()
d['datetime'] = line.getDetails()['datetimestring']
Expand All @@ -655,15 +664,7 @@ def handleRawStatusLine(self, file, line, **extra):
d['epoch secs'] = line.getDateTime().replace(tzinfo=datetime.timezone.utc).timestamp()

d['line num'] = line.lineno

"""if kind==EVENT_JMS_STATUS_DICT:
if m.endswith('<waiting for onApplicationInitialized>'):
d['waitingForOnAppInit'] = True
m = m[:m.index('<waiting for onApplicationInitialized')-1]
else:
d['waitingForOnAppInit'] = False
"""

i = m.index(':')+2
mlen = len(m)
while i < mlen:
Expand All @@ -673,9 +674,9 @@ def handleRawStatusLine(self, file, line, **extra):
key+= m[i]
i += 1
if i == mlen:
# this can happen if (mysteriously) a line break character is missing at end of status line (seen in 10.3.3); better to limp on rather than throwing
log.warning(f'Ignoring invalid status log line {line.lineno}: {m}')
return
# this can happen if (mysteriously) a line break character is missing at end of status line (seen in 10.3.3); better to limp on rather than throwing; but ignore the <...> message we include at the end of JMS status lines
(log.debug if (key.startswith('<') and key.endswith('>')) else log.warning)(f'Ignoring the rest of status log line {line.lineno}; expected "=" but found end of line: "{key}"')
break # don't ignore the bits we already parsed out successfully
assert m[i] == '=', (m, repr(m[i]))
i+=1
if m[i]=='"':
Expand All @@ -688,7 +689,6 @@ def handleRawStatusLine(self, file, line, **extra):
if endchar != '"' or m[i] != ',': # if not a string, suppress thousands character
val += m[i]
i+=1
#if kind == EVENT_JMS_STATUS_DICT: key = 'jms.'+key
if endchar != '"':
try:
if '.' in val:
Expand All @@ -702,45 +702,18 @@ def handleRawStatusLine(self, file, line, **extra):
if not d: return

#log.debug('Extracted status line %s: %s', d)
self.handleRawStatusDict(file=file, line=line, status=d)

"""
also requires this in file started:
# for handleRawStatusLine
self.__jmsenabled = None
self.__previous = None # rawstatusdict
if self.__jmsenabled is None:
if self.__previous is None:
# don't know yet if JMS is enabled
self.__previous = d
return
if kind is EVENT_CORRELATOR_STATUS_DICT:
self.__jmsenabled = False # two consecutive non-JMS statuses means its not enabled
self.handleRawStatusDict(status=self.__previous, line=line)
self.__previous = None
else:
self.__jmsenabled = True
if self.__jmsenabled is False:
self.manager.publish(EVENT_COMBINED_STATUS_DICT, status=d, line=line)
if userStatus is not None:
# must do namespacing here since there could be multiple user-defined statuses and we don't want them to clash
prefix = userStatus['keyPrefix']
for k, alias in userStatus['key:alias'].items():
if k in d:
self.userStatus[prefix+(alias or k)] = d[k]
else:
if kind is EVENT_JMS_STATUS_DICT:
combined = collections.OrderedDict(d)
combined.update(self.__previous)
self.handleRawStatusDict(status=combined, line=line)
self.__previous = None
else:
assert self.__previous is None, self.__previous
self.__previous = d # will publish it once we get the JMS line immediately following
# nb: this algorithm means a file containing only one correlator status line would be ignored, but don't care about that case really
"""

self.handleRawStatusDict(file=file, line=line, status=d)

def handleRawStatusDict(self, file, line, status=None, **extra):
"""
Accepts a raw status dictionary and converts it to an annotated status
Accepts a raw correlator status dictionary and converts it to an annotated status
dict (unordered) whose keys match the columns returned by
decideColumns, adding in calculated values.
Expand Down Expand Up @@ -768,6 +741,13 @@ def decideColumns(status):
if k in allkeys:
columns[k] = k

# now add on any user-defined status keys; always add these regardless of whether they're yet set,
# since they may come from EPL code that hasn't been injected yet and we can't change the columns later
for user in self.args.userStatusLines.values():
for k, alias in user['key:alias'].items(): # aliasing for user-defined status lines happens in handleRawStatusLine
k = user['keyPrefix']+(alias or k)
columns[k] = k

return columns

self.columns = decideColumns(status)
Expand Down Expand Up @@ -859,6 +839,7 @@ def decideColumns(status):
assert False, 'Unknown generated key: %s'%k
else:
val = status.get(k, None)
if val is None: val = self.userStatus.get(k, None)
if display[k] in ['pm=resident MB', 'vm=virtual MB'] and val is not None:
val = val/1024.0 # kb to MB

Expand Down Expand Up @@ -917,7 +898,15 @@ def _updateStatusSummary(self, file, line, status):
file['totalStatusLinesInFile'] += 1
for k, v in status.items():
if v is None or isinstance(v, str): continue
if v < file['status-min'][k]: file['status-min'][k] = v
try:
if v < file['status-min'][k]: file['status-min'][k] = v
except Exception: # this happens for user-defined statuses which weren't initialized right at the start
if file['status-min'][k] is None:
file['status-min'][k] = v
file['status-max'][k] = v
file['status-sum'][k] = 0
else: raise

if v > file['status-max'][k]:
file['status-max'][k] = v
file['status-max'][k+'.line'] = line # also useful to have datetime/linenum for the maximum ones
Expand Down Expand Up @@ -1008,7 +997,8 @@ def calcmean(k):
delta = collections.OrderedDict()
delta['statistic'] = f'... delta: {display} - {prev["statistic"]}'
for k in status:
if isinstance(status[k], str) or k in ['seconds', 'line num', 'interval secs'] or k.endswith('.line'):
if (isinstance(status[k], str) or k in ['seconds', 'line num', 'interval secs'] or k.endswith('.line')
or status[k] is None or prev[k] is None or isinstance(prev[k], str)):
delta[k] = ''
else:
try:
Expand Down Expand Up @@ -2002,7 +1992,7 @@ def writeOverviewHTMLForAllFiles(self, overviewHTML, **extra):
// workaround for the bug where Dygraph.prototype.setColors_ un-sets color for any series where visibility=false;
// this workaround gives correct color if configured using options{colors:[...]} and falls back to transparent if not
series.dashHTML = series.dashHTML.replace("color: undefined;", "color: "+(dygraph.getOption('colors')[seriesIndex] || "rgba(255,255,255,0.0)")+";");
series.dashHTML = series.dashHTML.replace("color: undefined;", "color: "+(dygraph.getColors()[seriesIndex] || "rgba(255,255,255,0.0)")+";");
if (showvalues && series != undefined && series.y != undefined) { labeledData += ': ' + series.yHTML; }
if (series.isHighlighted) { labeledData = '<b>' + labeledData + '</b>'; }
Expand Down Expand Up @@ -2185,6 +2175,9 @@ def __init__(self, analyzerFactory=LogAnalyzer):
self.argparser.add_argument('--json', action='store_true',
help='Advanced/debugging option to additionally write output in JSON format suitable for processing by scripts.')

self.argparser.add_argument('--config', metavar="FILE.json", type=str,
help='Configure the analyzer for advanced functionality such as custom/user-supplied log line extraction.')

self.argparser.add_argument('--XmaxUniqueWarnOrErrorLines', metavar='INT', default=1000, type=int,
help='Advanced option to put an upper limit on the number of unique warn/error log lines that will be held in memory. Specify 0 to disable warn/error line tracking.')
self.argparser.add_argument('--XmaxSampleWarnOrErrorLines', metavar='INT', default=5, type=int,
Expand Down Expand Up @@ -2218,6 +2211,30 @@ def main(self, args):
globbedpaths = [toLongPathSafe(p) for p in globbedpaths]
globbedpaths.sort() # best we can do until when start reading them - hopefully puts the latest one at the end

if args.config:
with open(args.config, 'rb') as f:
jsonbytes = f.read()
# permit # and // comments in the JSON file for added usability
jsonbytes = re.sub(b'^[\t ]*(#|//).*', b'', jsonbytes, flags=re.MULTILINE)
for k, v in json.loads(jsonbytes).items():
if k == 'userStatusLines':
args.userStatusLines = v
# sanity check it
columns = {k or COLUMN_DISPLAY_NAMES[k] for k in COLUMN_DISPLAY_NAMES}
for userStatusPrefix, userStatus in v.items():
if not userStatusPrefix.endswith(':'): raise UserError('userStatus prefixes should end with a ":"')
for k, alias in userStatus['key:alias'].items():
alias = userStatus['keyPrefix']+(alias or k)
if alias in columns: raise UserError(f"User status line '{userStatusPrefix}' contains display name '{alias}' which is already in use; consider using keyPrefix to ensure this status line doesn't conflict with display names from others")
columns.add(alias)
elif k == 'userCharts':
userCharts = v # allow overriding existing charts if desired
else:
raise UserError('Unknown key in config file: '%key)
else:
args.userStatusLines = {}
userCharts = {}

if not globbedpaths: raise UserError('No log files specified')

if not args.output:
Expand Down Expand Up @@ -2295,6 +2312,8 @@ def addDirectory(root):
logpaths.add(p)

manager = self.analyzerFactory(args)
manager.CHARTS.update(userCharts) # allow overriding existing charts if desired

manager.processFiles(sorted(list(logpaths)))

duration = time.time()-duration
Expand Down
Loading

0 comments on commit 7c3cd5a

Please sign in to comment.