|
#!/usr/bin/env python
|
|
|
|
import requests
|
|
import sys
|
|
import re
|
|
import time
|
|
import getopt
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import time
|
|
from copy import deepcopy
|
|
from influxdb import InfluxDBClient
|
|
|
|
resourcesToTrack = {'cpu_pct':'%cpu', 'mem_pct':'%mem', 'virtual_kb':'vsz', 'resident_kb':'rss'}
|
|
|
|
#******************************************************************************
|
|
def runCmd(cmd):
|
|
resultStr = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE).stdout.read()
|
|
return resultStr
|
|
|
|
def getProcessPid(processName):
|
|
getPidCmdTemplate = 'pgrep {0} | tr "\\n" "," | sed \'s/,$//\''
|
|
|
|
cmd = getPidCmdTemplate.format(processName)
|
|
pidStr = runCmd(cmd)
|
|
if len(pidStr) > 0:
|
|
return int(pidStr)
|
|
return None
|
|
|
|
def getProcessResources(processName):
|
|
global resourcesToTrack
|
|
getResourcesCmdTemplte = 'ps -h -p {0} -o {1} | awk \'NR>1\''
|
|
|
|
pid = getProcessPid(processName)
|
|
if pid:
|
|
cmd = getResourcesCmdTemplte.format(pid, ','.join(resourcesToTrack.values()))
|
|
resourcesStr = runCmd(cmd)
|
|
resources = filter(lambda x: len(x) > 0, resourcesStr.split(' '))
|
|
resDict = {}
|
|
for i in range(0,len(resourcesToTrack)):
|
|
label = resourcesToTrack.keys()[i]
|
|
value = float(resources[i])
|
|
resDict[label] = value
|
|
return resDict
|
|
return None
|
|
|
|
#******************************************************************************
|
|
def ingestProcessResources(adaptor, processName, username, hubLabel, resDict):
|
|
metric = IngestAdaptor.metricGenericTemplate
|
|
metric['tags'] = {'process':processName, 'user':username, 'hub':hubLabel}
|
|
metric['timestamp'] = adaptor.fromUnixTimestamp(time.time())
|
|
|
|
for m in resDict:
|
|
md = deepcopy(metric)
|
|
md['metric'] = m
|
|
md['value'] = resDict[m]
|
|
adaptor.metricFunc(md)
|
|
|
|
def ingestResources(adaptor, username, hubLabel, processResDict):
|
|
for processName in processResDict:
|
|
if processResDict[processName]:
|
|
ingestProcessResources(adaptor, processName, username, hubLabel, processResDict[processName])
|
|
|
|
def run(username, hubLabel, isTrackingNdncon, isTrackingNfd, ingestAdaptor):
|
|
frequency = 1.
|
|
ndnconName = "ndncon"
|
|
nfdName = "nfd"
|
|
ndnconResources = None
|
|
nfdResources = None
|
|
while True:
|
|
if isTrackingNdncon:
|
|
ndnconResources = getProcessResources(ndnconName)
|
|
if isTrackingNfd:
|
|
nfdResources = getProcessResources(nfdName)
|
|
|
|
ingestResources(ingestAdaptor, username, hubLabel, {ndnconName:ndnconResources, nfdName:nfdResources})
|
|
time.sleep(1/frequency)
|
|
|
|
#******************************************************************************
|
|
# classes borrowed & adapted from https://github.com/peetonn/ndnrtc-tools
|
|
#******************************************************************************
|
|
class IngestAdaptor(object):
|
|
metricGenericTemplate = { 'metric' : None, 'timestamp': None, 'value': None, 'fields': {}, 'tags': {}}
|
|
startTimestamp = None
|
|
baseTimestamp = None
|
|
timeOffset = 0
|
|
writeCtr = 0
|
|
lastWrite = 0
|
|
metrics = []
|
|
batchSize = 10
|
|
dryRun = False
|
|
metricWriteCounter = {}
|
|
|
|
def __init__(self, timeOffset = 0):
|
|
self.timeOffset = timeOffset
|
|
self.baseTimestamp = 0
|
|
|
|
def connect(self):
|
|
pass
|
|
|
|
def timeFunc(self, match):
|
|
return 0
|
|
|
|
def fromUnixTimestamp(self, unixTimestamp):
|
|
return 0
|
|
|
|
def uniquefyTimestamps(self, metrics):
|
|
timestamps = {}
|
|
for metric in metrics:
|
|
k = metric['timestamp']
|
|
if k in timestamps.keys():
|
|
timestamps[k] += 1
|
|
metric['timestamp'] += timestamps[k]
|
|
k = metric['timestamp']
|
|
timestamps[k] = 0
|
|
|
|
def writeBatch(self, metrics):
|
|
pass
|
|
|
|
def metricFunc(self, json):
|
|
self.metrics.append(json)
|
|
if len(self.metrics) == self.batchSize:
|
|
self.writeBatch(self.metrics)
|
|
self.writeCtr += self.batchSize
|
|
self.metrics = []
|
|
if self.writeCtr-self.lastWrite >= 1000 and not self.dryRun:
|
|
print "wrote "+str(self.writeCtr-self.lastWrite)+" measurements. "+str(self.writeCtr)+" total."
|
|
self.lastWrite = self.writeCtr
|
|
if not json['metric'] in self.metricWriteCounter.keys():
|
|
self.metricWriteCounter[json['metric']] = 0
|
|
self.metricWriteCounter[json['metric']] += 1
|
|
|
|
def finalize(self):
|
|
if len(self.metrics) > 0:
|
|
self.writeBatch(self.metrics)
|
|
self.writeCtr += len(self.metrics)
|
|
self.metrics = []
|
|
print("wrote "+str(self.writeCtr)+" measurements total")
|
|
for key in self.metricWriteCounter.keys():
|
|
print(key+': '+str(self.metricWriteCounter[key]))
|
|
|
|
@staticmethod
|
|
def printMetric(json):
|
|
sys.stdout.write(str(json['metric'])+" "+str(json['timestamp'])+" "+str(json['value']))
|
|
for key in json['tags'].keys():
|
|
sys.stdout.write(' '+str(key)+'='+str(json['tags'][key]))
|
|
for key in json['fields'].keys():
|
|
sys.stdout.write(' '+str(key)+'='+str(json['fields'][key]))
|
|
sys.stdout.write('\n')
|
|
|
|
@staticmethod
|
|
def printMetrics(metrics):
|
|
for m in metrics:
|
|
IngestAdaptor.printMetric(m)
|
|
|
|
class InfluxAdaptor(IngestAdaptor):
|
|
influxClient = None
|
|
influxJsonTemplate = { "measurement": None, "time": None, "fields": { "value": None }, "tags": {} }
|
|
batchSize = 1000
|
|
metrics = []
|
|
|
|
def __init__(self, user, password, dbname, timeOffset = 0, host = 'localhost', port = 8086):
|
|
super(InfluxAdaptor, self).__init__(timeOffset)
|
|
self.connect(host, port, user, password, dbname)
|
|
self.baseTimestamp = 1234560000000 # millisec
|
|
|
|
def connect(self, host, port, user, password, dbname):
|
|
# user = 'parser'
|
|
# password = 'letmein'
|
|
# dbname = 'test'
|
|
dbuser = 'parser'
|
|
dbuser_password = 'letmein'
|
|
self.influxClient = InfluxDBClient(host, port, user, password, dbname)
|
|
|
|
def timeFunc(self, match):
|
|
timestamp = int(match.group('timestamp'))
|
|
if not self.startTimestamp:
|
|
self.startTimestamp = timestamp
|
|
unixTimestamp = self.baseTimestamp + (timestamp - self.startTimestamp)
|
|
return unixTimestamp*1000000 # nanosec
|
|
|
|
def fromUnixTimestamp(self, unixTimestamp):
|
|
return int(unixTimestamp*1000000000) # nanosec
|
|
|
|
def writeBatch(self, metrics):
|
|
self.uniquefyTimestamps(metrics)
|
|
if self.dryRun:
|
|
IngestAdaptor.printMetrics(metrics)
|
|
else:
|
|
batch = []
|
|
for m in self.metrics:
|
|
batch.append(self.toInfluxJson(m))
|
|
self.influxClient.write_points(batch)
|
|
|
|
def toInfluxJson(self, genericJson):
|
|
influxJson = deepcopy(self.influxJsonTemplate)
|
|
influxJson['measurement'] = genericJson['metric']
|
|
influxJson['time'] = genericJson['timestamp']
|
|
influxJson['fields']['value'] = genericJson['value']
|
|
for key in genericJson['fields'].keys():
|
|
influxJson['fields'][key] = genericJson['fields'][key]
|
|
for key in genericJson['tags'].keys():
|
|
influxJson['tags'][key] = genericJson['tags'][key]
|
|
return influxJson
|
|
|
|
class TsdbAdaptor(IngestAdaptor):
|
|
tsdbJsonTemplate = { "metric": None, "timestamp": None, "value": None, "tags": {} }
|
|
|
|
def __init__(self, timeOffset = 0, tsdbUri = 'http://localhost:4242/api/put?details', batchSize = 20):
|
|
super(TsdbAdaptor, self).__init__(timeOffset)
|
|
self.tsdbUri = tsdbUri
|
|
self.batchSize = batchSize
|
|
self.baseTimestamp = 1234560000000
|
|
|
|
def timeFunc(self, match):
|
|
timestamp = int(match.group('timestamp'))
|
|
if not self.startTimestamp:
|
|
self.startTimestamp = timestamp
|
|
unixTimestamp = self.baseTimestamp + (timestamp - self.startTimestamp) + self.timeOffset
|
|
return unixTimestamp
|
|
|
|
def fromUnixTimestamp(self, unixTimestamp):
|
|
return int(unixTimestamp*1000) # milisec
|
|
|
|
def writeBatch(self, metrics):
|
|
if self.dryRun:
|
|
IngestAdaptor.printMetrics(metrics)
|
|
else:
|
|
tsdbMetrics = []
|
|
for m in metrics:
|
|
tsdbMetrics.append(self.toTsdbJson(m))
|
|
response = requests.post(url=self.tsdbUri, data=json.dumps(tsdbMetrics), headers={'content-type': 'application/json'})
|
|
if response.status_code != 200:
|
|
print('return code '+str(response.status_code)+\
|
|
'. aborting. info: '+response.text)
|
|
exit(1)
|
|
|
|
def toTsdbJson(self, genericJson):
|
|
tsdbJson = deepcopy(self.tsdbJsonTemplate)
|
|
tsdbJson['metric'] = genericJson['metric']
|
|
tsdbJson['value'] = genericJson['value']
|
|
tsdbJson['timestamp'] = genericJson['timestamp']
|
|
for key in genericJson['fields'].keys():
|
|
tsdbJson['tags'][key] = genericJson['fields'][key]
|
|
for key in genericJson['tags'].keys():
|
|
tsdbJson['tags'][key] = genericJson['tags'][key]
|
|
return tsdbJson
|
|
|
|
#******************************************************************************
|
|
def usage():
|
|
print "usage: "+sys.argv[0]+" --username=<user name> --hub=<home hub label> [--no-ndncon, --no-nfd]"
|
|
print ""
|
|
print "the tool allows to track CPU and memory resources consumed by ndncon and/or NFD and ingest "
|
|
print "this data into remote data base for real-time and historical analysis"
|
|
print ""
|
|
print "\texample:"
|
|
print "\t\t"+sys.argv[0]+" --username=peter --hub=remap"
|
|
print "\toptions:"
|
|
print "\t\t--no-nfd:\ttrack ndncon resources only"
|
|
print "\t\t--no-ndncon:\ttrack NFD resources only"
|
|
print "\texamples:"
|
|
print "\t\ttrack CPU and memory consumption from ndncon only:"
|
|
print "\t\t\t"+sys.argv[0]+" --username=peter --hub=remap --no-nfd"
|
|
print "\t\ttrack CPU and memory consumption from NFD only:"
|
|
print "\t\t\t"+sys.argv[0]+" --username=peter --hub=remap --no-ndncon"
|
|
|
|
def main():
|
|
global resourcesToTrack
|
|
|
|
try:
|
|
opts, args = getopt.getopt(sys.argv[1:], "", ["username=", "hub=", "no-ndncon", "no-nfd", "dry-run", "iuser=", "ipassword=", "idb=", "influx-adaptor", "port=", "host="])
|
|
except getopt.GetoptError as err:
|
|
print str(err)
|
|
usage()
|
|
exit(2)
|
|
|
|
# ingestion parameters
|
|
port = None
|
|
host = 'localhost'
|
|
tsdbPort = 4242
|
|
influxPort = 8086
|
|
useTsdbAdaptor = True
|
|
dryRun = False
|
|
influx_username = None
|
|
influx_password = None
|
|
influx_dataBaseName = None
|
|
|
|
username = None
|
|
hubLabel = None
|
|
trackNdncon = True
|
|
trackNfd = True
|
|
for o, a in opts:
|
|
if o in ("--username"):
|
|
username = a
|
|
elif o in ("--hub"):
|
|
hubLabel = a
|
|
elif o in ("--no-ndncon"):
|
|
trackNdncon = False
|
|
elif o in ("--no-nfd"):
|
|
trackNfd = False
|
|
elif o in ("--dry-run"):
|
|
dryRun = True
|
|
elif o in ("--port"):
|
|
port = int(a)
|
|
elif o in ("--host"):
|
|
host = a
|
|
elif o in ("--influx-adaptor"):
|
|
useTsdbAdaptor = False
|
|
elif o in ("--iuser"):
|
|
influx_username = a
|
|
elif o in ("--ipassword"):
|
|
influx_password = a
|
|
elif o in ("--idb"):
|
|
influx_dataBaseName = a
|
|
else:
|
|
assert False, "unhandled option "+o
|
|
if not (username and hubLabel):
|
|
usage()
|
|
exit(2)
|
|
|
|
if not useTsdbAdaptor and not (influx_username and influx_password and influx_dataBaseName):
|
|
print influx_username, influx_password, influx_dataBaseName
|
|
print "asked for influx adaptor, but didn't provide username, password and db name. aborting"
|
|
exit(2)
|
|
|
|
if useTsdbAdaptor:
|
|
if port: tsdbPort = port
|
|
ingestAdaptor = TsdbAdaptor(timeOffset=0, tsdbUri='http://{0}:{1}/api/put?details'.format(host, tsdbPort))
|
|
else:
|
|
if port: influxPort = port
|
|
ingestAdaptor = InfluxAdaptor(user=influx_username, password=influx_password, dbname=influx_dataBaseName, timeOffset=0, host=host, port=influxPort)
|
|
|
|
ingestAdaptor.batchSize = len(resourcesToTrack)
|
|
ingestAdaptor.dryRun = dryRun
|
|
|
|
if trackNdncon or trackNfd:
|
|
run(username, hubLabel, trackNdncon, trackNfd, ingestAdaptor)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|