Project

General

Profile

Task #3485 » ingest-res-usage.py

Peter Gusev, 03/02/2016 10:07 AM

 
#!/usr/bin/env python

import requests
import sys
import re
import time
import getopt
import json
import os
import subprocess
import time
from copy import deepcopy
from influxdb import InfluxDBClient

resourcesToTrack = {'cpu_pct':'%cpu', 'mem_pct':'%mem', 'virtual_kb':'vsz', 'resident_kb':'rss'}

#******************************************************************************
def runCmd(cmd):
resultStr = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE).stdout.read()
return resultStr

def getProcessPid(processName):
getPidCmdTemplate = 'pgrep {0} | tr "\\n" "," | sed \'s/,$//\''

cmd = getPidCmdTemplate.format(processName)
pidStr = runCmd(cmd)
if len(pidStr) > 0:
return int(pidStr)
return None

def getProcessResources(processName):
global resourcesToTrack
getResourcesCmdTemplte = 'ps -h -p {0} -o {1} | awk \'NR>1\''

pid = getProcessPid(processName)
if pid:
cmd = getResourcesCmdTemplte.format(pid, ','.join(resourcesToTrack.values()))
resourcesStr = runCmd(cmd)
resources = filter(lambda x: len(x) > 0, resourcesStr.split(' '))
resDict = {}
for i in range(0,len(resourcesToTrack)):
label = resourcesToTrack.keys()[i]
value = float(resources[i])
resDict[label] = value
return resDict
return None

#******************************************************************************
def ingestProcessResources(adaptor, processName, username, hubLabel, resDict):
metric = IngestAdaptor.metricGenericTemplate
metric['tags'] = {'process':processName, 'user':username, 'hub':hubLabel}
metric['timestamp'] = adaptor.fromUnixTimestamp(time.time())

for m in resDict:
md = deepcopy(metric)
md['metric'] = m
md['value'] = resDict[m]
adaptor.metricFunc(md)

def ingestResources(adaptor, username, hubLabel, processResDict):
for processName in processResDict:
if processResDict[processName]:
ingestProcessResources(adaptor, processName, username, hubLabel, processResDict[processName])

def run(username, hubLabel, isTrackingNdncon, isTrackingNfd, ingestAdaptor):
frequency = 1.
ndnconName = "ndncon"
nfdName = "nfd"
ndnconResources = None
nfdResources = None
while True:
if isTrackingNdncon:
ndnconResources = getProcessResources(ndnconName)
if isTrackingNfd:
nfdResources = getProcessResources(nfdName)

ingestResources(ingestAdaptor, username, hubLabel, {ndnconName:ndnconResources, nfdName:nfdResources})
time.sleep(1/frequency)

#******************************************************************************
# classes borrowed & adapted from https://github.com/peetonn/ndnrtc-tools
#******************************************************************************
class IngestAdaptor(object):
metricGenericTemplate = { 'metric' : None, 'timestamp': None, 'value': None, 'fields': {}, 'tags': {}}
startTimestamp = None
baseTimestamp = None
timeOffset = 0
writeCtr = 0
lastWrite = 0
metrics = []
batchSize = 10
dryRun = False
metricWriteCounter = {}

def __init__(self, timeOffset = 0):
self.timeOffset = timeOffset
self.baseTimestamp = 0

def connect(self):
pass

def timeFunc(self, match):
return 0

def fromUnixTimestamp(self, unixTimestamp):
return 0

def uniquefyTimestamps(self, metrics):
timestamps = {}
for metric in metrics:
k = metric['timestamp']
if k in timestamps.keys():
timestamps[k] += 1
metric['timestamp'] += timestamps[k]
k = metric['timestamp']
timestamps[k] = 0

def writeBatch(self, metrics):
pass

def metricFunc(self, json):
self.metrics.append(json)
if len(self.metrics) == self.batchSize:
self.writeBatch(self.metrics)
self.writeCtr += self.batchSize
self.metrics = []
if self.writeCtr-self.lastWrite >= 1000 and not self.dryRun:
print "wrote "+str(self.writeCtr-self.lastWrite)+" measurements. "+str(self.writeCtr)+" total."
self.lastWrite = self.writeCtr
if not json['metric'] in self.metricWriteCounter.keys():
self.metricWriteCounter[json['metric']] = 0
self.metricWriteCounter[json['metric']] += 1

def finalize(self):
if len(self.metrics) > 0:
self.writeBatch(self.metrics)
self.writeCtr += len(self.metrics)
self.metrics = []
print("wrote "+str(self.writeCtr)+" measurements total")
for key in self.metricWriteCounter.keys():
print(key+': '+str(self.metricWriteCounter[key]))

@staticmethod
def printMetric(json):
sys.stdout.write(str(json['metric'])+" "+str(json['timestamp'])+" "+str(json['value']))
for key in json['tags'].keys():
sys.stdout.write(' '+str(key)+'='+str(json['tags'][key]))
for key in json['fields'].keys():
sys.stdout.write(' '+str(key)+'='+str(json['fields'][key]))
sys.stdout.write('\n')

@staticmethod
def printMetrics(metrics):
for m in metrics:
IngestAdaptor.printMetric(m)

class InfluxAdaptor(IngestAdaptor):
influxClient = None
influxJsonTemplate = { "measurement": None, "time": None, "fields": { "value": None }, "tags": {} }
batchSize = 1000
metrics = []

def __init__(self, user, password, dbname, timeOffset = 0, host = 'localhost', port = 8086):
super(InfluxAdaptor, self).__init__(timeOffset)
self.connect(host, port, user, password, dbname)
self.baseTimestamp = 1234560000000 # millisec

def connect(self, host, port, user, password, dbname):
# user = 'parser'
# password = 'letmein'
# dbname = 'test'
dbuser = 'parser'
dbuser_password = 'letmein'
self.influxClient = InfluxDBClient(host, port, user, password, dbname)

def timeFunc(self, match):
timestamp = int(match.group('timestamp'))
if not self.startTimestamp:
self.startTimestamp = timestamp
unixTimestamp = self.baseTimestamp + (timestamp - self.startTimestamp)
return unixTimestamp*1000000 # nanosec

def fromUnixTimestamp(self, unixTimestamp):
return int(unixTimestamp*1000000000) # nanosec

def writeBatch(self, metrics):
self.uniquefyTimestamps(metrics)
if self.dryRun:
IngestAdaptor.printMetrics(metrics)
else:
batch = []
for m in self.metrics:
batch.append(self.toInfluxJson(m))
self.influxClient.write_points(batch)

def toInfluxJson(self, genericJson):
influxJson = deepcopy(self.influxJsonTemplate)
influxJson['measurement'] = genericJson['metric']
influxJson['time'] = genericJson['timestamp']
influxJson['fields']['value'] = genericJson['value']
for key in genericJson['fields'].keys():
influxJson['fields'][key] = genericJson['fields'][key]
for key in genericJson['tags'].keys():
influxJson['tags'][key] = genericJson['tags'][key]
return influxJson

class TsdbAdaptor(IngestAdaptor):
tsdbJsonTemplate = { "metric": None, "timestamp": None, "value": None, "tags": {} }

def __init__(self, timeOffset = 0, tsdbUri = 'http://localhost:4242/api/put?details', batchSize = 20):
super(TsdbAdaptor, self).__init__(timeOffset)
self.tsdbUri = tsdbUri
self.batchSize = batchSize
self.baseTimestamp = 1234560000000

def timeFunc(self, match):
timestamp = int(match.group('timestamp'))
if not self.startTimestamp:
self.startTimestamp = timestamp
unixTimestamp = self.baseTimestamp + (timestamp - self.startTimestamp) + self.timeOffset
return unixTimestamp

def fromUnixTimestamp(self, unixTimestamp):
return int(unixTimestamp*1000) # milisec

def writeBatch(self, metrics):
if self.dryRun:
IngestAdaptor.printMetrics(metrics)
else:
tsdbMetrics = []
for m in metrics:
tsdbMetrics.append(self.toTsdbJson(m))
response = requests.post(url=self.tsdbUri, data=json.dumps(tsdbMetrics), headers={'content-type': 'application/json'})
if response.status_code != 200:
print('return code '+str(response.status_code)+\
'. aborting. info: '+response.text)
exit(1)

def toTsdbJson(self, genericJson):
tsdbJson = deepcopy(self.tsdbJsonTemplate)
tsdbJson['metric'] = genericJson['metric']
tsdbJson['value'] = genericJson['value']
tsdbJson['timestamp'] = genericJson['timestamp']
for key in genericJson['fields'].keys():
tsdbJson['tags'][key] = genericJson['fields'][key]
for key in genericJson['tags'].keys():
tsdbJson['tags'][key] = genericJson['tags'][key]
return tsdbJson

#******************************************************************************
def usage():
print "usage: "+sys.argv[0]+" --username=<user name> --hub=<home hub label> [--no-ndncon, --no-nfd]"
print ""
print "the tool allows to track CPU and memory resources consumed by ndncon and/or NFD and ingest "
print "this data into remote data base for real-time and historical analysis"
print ""
print "\texample:"
print "\t\t"+sys.argv[0]+" --username=peter --hub=remap"
print "\toptions:"
print "\t\t--no-nfd:\ttrack ndncon resources only"
print "\t\t--no-ndncon:\ttrack NFD resources only"
print "\texamples:"
print "\t\ttrack CPU and memory consumption from ndncon only:"
print "\t\t\t"+sys.argv[0]+" --username=peter --hub=remap --no-nfd"
print "\t\ttrack CPU and memory consumption from NFD only:"
print "\t\t\t"+sys.argv[0]+" --username=peter --hub=remap --no-ndncon"

def main():
global resourcesToTrack

try:
opts, args = getopt.getopt(sys.argv[1:], "", ["username=", "hub=", "no-ndncon", "no-nfd", "dry-run", "iuser=", "ipassword=", "idb=", "influx-adaptor", "port=", "host="])
except getopt.GetoptError as err:
print str(err)
usage()
exit(2)

# ingestion parameters
port = None
host = 'localhost'
tsdbPort = 4242
influxPort = 8086
useTsdbAdaptor = True
dryRun = False
influx_username = None
influx_password = None
influx_dataBaseName = None

username = None
hubLabel = None
trackNdncon = True
trackNfd = True
for o, a in opts:
if o in ("--username"):
username = a
elif o in ("--hub"):
hubLabel = a
elif o in ("--no-ndncon"):
trackNdncon = False
elif o in ("--no-nfd"):
trackNfd = False
elif o in ("--dry-run"):
dryRun = True
elif o in ("--port"):
port = int(a)
elif o in ("--host"):
host = a
elif o in ("--influx-adaptor"):
useTsdbAdaptor = False
elif o in ("--iuser"):
influx_username = a
elif o in ("--ipassword"):
influx_password = a
elif o in ("--idb"):
influx_dataBaseName = a
else:
assert False, "unhandled option "+o
if not (username and hubLabel):
usage()
exit(2)

if not useTsdbAdaptor and not (influx_username and influx_password and influx_dataBaseName):
print influx_username, influx_password, influx_dataBaseName
print "asked for influx adaptor, but didn't provide username, password and db name. aborting"
exit(2)

if useTsdbAdaptor:
if port: tsdbPort = port
ingestAdaptor = TsdbAdaptor(timeOffset=0, tsdbUri='http://{0}:{1}/api/put?details'.format(host, tsdbPort))
else:
if port: influxPort = port
ingestAdaptor = InfluxAdaptor(user=influx_username, password=influx_password, dbname=influx_dataBaseName, timeOffset=0, host=host, port=influxPort)

ingestAdaptor.batchSize = len(resourcesToTrack)
ingestAdaptor.dryRun = dryRun

if trackNdncon or trackNfd:
run(username, hubLabel, trackNdncon, trackNfd, ingestAdaptor)

if __name__ == '__main__':
main()
(1-1/4)