Project

General

Profile

Task #3485 » ingest-res-usage.py

Peter Gusev, 03/02/2016 10:07 AM

 
1
#!/usr/bin/env python
2

    
3
import requests
4
import sys
5
import re
6
import time
7
import getopt
8
import json
9
import os
10
import subprocess
11
import time
12
from copy import deepcopy
13
from influxdb import InfluxDBClient
14

    
15
resourcesToTrack = {'cpu_pct':'%cpu', 'mem_pct':'%mem', 'virtual_kb':'vsz', 'resident_kb':'rss'}
16

    
17
#******************************************************************************
18
def runCmd(cmd):
19
	resultStr = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE).stdout.read()
20
	return resultStr
21

    
22
def getProcessPid(processName):
23
	getPidCmdTemplate = 'pgrep {0} | tr "\\n" "," | sed \'s/,$//\''
24

    
25
	cmd = getPidCmdTemplate.format(processName)
26
	pidStr = runCmd(cmd)
27
	if len(pidStr) > 0:
28
		return int(pidStr)
29
	return None
30

    
31
def getProcessResources(processName):
32
	global resourcesToTrack
33
	getResourcesCmdTemplte = 'ps -h -p {0} -o {1} | awk \'NR>1\''
34

    
35
	pid = getProcessPid(processName)
36
	if pid:
37
		cmd = getResourcesCmdTemplte.format(pid, ','.join(resourcesToTrack.values()))
38
		resourcesStr = runCmd(cmd)
39
		resources = filter(lambda x: len(x) > 0, resourcesStr.split(' '))
40
		resDict = {}
41
		for i in range(0,len(resourcesToTrack)):
42
			label = resourcesToTrack.keys()[i]
43
			value = float(resources[i])
44
			resDict[label] = value
45
		return resDict
46
	return None
47

    
48
#******************************************************************************
49
def ingestProcessResources(adaptor, processName, username, hubLabel, resDict):
50
	metric = IngestAdaptor.metricGenericTemplate
51
	metric['tags'] = {'process':processName, 'user':username, 'hub':hubLabel}
52
	metric['timestamp'] = adaptor.fromUnixTimestamp(time.time())
53

    
54
	for m in resDict:
55
		md = deepcopy(metric)
56
		md['metric'] = m
57
		md['value'] = resDict[m]
58
		adaptor.metricFunc(md)
59

    
60
def ingestResources(adaptor, username, hubLabel, processResDict):
61
	for processName in processResDict:
62
		if processResDict[processName]:
63
			ingestProcessResources(adaptor, processName, username, hubLabel, processResDict[processName])
64

    
65
def run(username, hubLabel, isTrackingNdncon, isTrackingNfd, ingestAdaptor):
66
	frequency = 1.
67
	ndnconName = "ndncon"
68
	nfdName = "nfd"
69
	ndnconResources = None
70
	nfdResources = None
71
	while True:
72
		if isTrackingNdncon:
73
			ndnconResources = getProcessResources(ndnconName)
74
		if isTrackingNfd:
75
			nfdResources = getProcessResources(nfdName)
76

    
77
		ingestResources(ingestAdaptor, username, hubLabel, {ndnconName:ndnconResources, nfdName:nfdResources})
78
		time.sleep(1/frequency)
79

    
80
#******************************************************************************
81
# classes borrowed & adapted from https://github.com/peetonn/ndnrtc-tools
82
#******************************************************************************
83
class IngestAdaptor(object):
84
	metricGenericTemplate = { 'metric' : None, 'timestamp': None, 'value': None, 'fields': {}, 'tags': {}}
85
	startTimestamp = None
86
	baseTimestamp = None
87
	timeOffset = 0
88
	writeCtr = 0
89
	lastWrite = 0
90
	metrics = []
91
	batchSize = 10
92
	dryRun = False
93
	metricWriteCounter = {}
94

    
95
	def __init__(self, timeOffset = 0):
96
		self.timeOffset = timeOffset
97
		self.baseTimestamp = 0
98

    
99
	def connect(self):
100
		pass
101

    
102
	def timeFunc(self, match):
103
		return 0
104

    
105
	def fromUnixTimestamp(self, unixTimestamp):
106
		return 0
107

    
108
	def uniquefyTimestamps(self, metrics):
109
		timestamps = {}
110
		for metric in metrics:
111
			k = metric['timestamp']
112
			if k in timestamps.keys():
113
				timestamps[k] += 1
114
				metric['timestamp'] += timestamps[k]
115
				k = metric['timestamp']
116
			timestamps[k] = 0
117

    
118
	def writeBatch(self, metrics):
119
		pass
120

    
121
	def metricFunc(self, json):
122
		self.metrics.append(json)
123
		if len(self.metrics) == self.batchSize:
124
			self.writeBatch(self.metrics)
125
			self.writeCtr += self.batchSize
126
			self.metrics = []
127
			if self.writeCtr-self.lastWrite >= 1000 and not self.dryRun:
128
				print "wrote "+str(self.writeCtr-self.lastWrite)+" measurements. "+str(self.writeCtr)+" total."
129
				self.lastWrite = self.writeCtr
130
		if not json['metric'] in self.metricWriteCounter.keys():
131
			self.metricWriteCounter[json['metric']] = 0
132
		self.metricWriteCounter[json['metric']] += 1
133

    
134
	def finalize(self):
135
		if len(self.metrics) > 0:
136
			self.writeBatch(self.metrics)
137
			self.writeCtr += len(self.metrics)
138
			self.metrics = []
139
		print("wrote "+str(self.writeCtr)+" measurements total")
140
		for key in self.metricWriteCounter.keys():
141
			print(key+': '+str(self.metricWriteCounter[key]))
142

    
143
	@staticmethod
144
	def printMetric(json):
145
		sys.stdout.write(str(json['metric'])+" "+str(json['timestamp'])+" "+str(json['value']))
146
		for key in json['tags'].keys():
147
			sys.stdout.write(' '+str(key)+'='+str(json['tags'][key]))
148
		for key in json['fields'].keys():
149
			sys.stdout.write(' '+str(key)+'='+str(json['fields'][key]))
150
		sys.stdout.write('\n')
151

    
152
	@staticmethod
153
	def printMetrics(metrics):
154
		for m in metrics:
155
			IngestAdaptor.printMetric(m)
156

    
157
class InfluxAdaptor(IngestAdaptor):
158
	influxClient = None
159
	influxJsonTemplate = { "measurement": None, "time": None, "fields": { "value": None }, "tags": {} }
160
	batchSize = 1000
161
	metrics = []
162

    
163
	def __init__(self, user, password, dbname, timeOffset = 0, host = 'localhost', port = 8086):
164
		super(InfluxAdaptor, self).__init__(timeOffset)
165
		self.connect(host, port, user, password, dbname)
166
		self.baseTimestamp = 1234560000000 # millisec
167

    
168
	def connect(self, host, port, user, password, dbname):
169
		# user = 'parser'
170
		# password = 'letmein'
171
		# dbname = 'test'
172
		dbuser = 'parser'
173
		dbuser_password = 'letmein'
174
		self.influxClient = InfluxDBClient(host, port, user, password, dbname)
175

    
176
	def timeFunc(self, match):
177
		timestamp = int(match.group('timestamp'))
178
		if not self.startTimestamp:
179
			self.startTimestamp = timestamp
180
		unixTimestamp = self.baseTimestamp + (timestamp - self.startTimestamp)
181
		return unixTimestamp*1000000 # nanosec
182

    
183
	def fromUnixTimestamp(self, unixTimestamp):
184
		return int(unixTimestamp*1000000000) # nanosec
185

    
186
	def writeBatch(self, metrics):
187
		self.uniquefyTimestamps(metrics)
188
		if self.dryRun:
189
			IngestAdaptor.printMetrics(metrics)
190
		else:
191
			batch = []
192
			for m in self.metrics:
193
				batch.append(self.toInfluxJson(m))
194
			self.influxClient.write_points(batch)
195

    
196
	def toInfluxJson(self, genericJson):
197
		influxJson = deepcopy(self.influxJsonTemplate)
198
		influxJson['measurement'] = genericJson['metric']
199
		influxJson['time'] = genericJson['timestamp']
200
		influxJson['fields']['value'] = genericJson['value']
201
		for key in genericJson['fields'].keys():
202
			influxJson['fields'][key] = genericJson['fields'][key]
203
		for key in genericJson['tags'].keys():
204
			influxJson['tags'][key] = genericJson['tags'][key]
205
		return influxJson
206

    
207
class TsdbAdaptor(IngestAdaptor):
208
	tsdbJsonTemplate = { "metric": None, "timestamp": None, "value": None, "tags": {} }
209

    
210
	def __init__(self, timeOffset = 0, tsdbUri = 'http://localhost:4242/api/put?details', batchSize = 20):
211
		super(TsdbAdaptor, self).__init__(timeOffset)
212
		self.tsdbUri = tsdbUri
213
		self.batchSize = batchSize
214
		self.baseTimestamp = 1234560000000
215

    
216
	def timeFunc(self, match):
217
		timestamp = int(match.group('timestamp'))
218
		if not self.startTimestamp:
219
			self.startTimestamp = timestamp
220
		unixTimestamp = self.baseTimestamp + (timestamp - self.startTimestamp) + self.timeOffset
221
		return unixTimestamp
222

    
223
	def fromUnixTimestamp(self, unixTimestamp):
224
		return int(unixTimestamp*1000) # milisec
225

    
226
	def writeBatch(self, metrics):
227
		if self.dryRun:
228
			IngestAdaptor.printMetrics(metrics)
229
		else:
230
			tsdbMetrics = []
231
			for m in metrics:
232
				tsdbMetrics.append(self.toTsdbJson(m))
233
			response = requests.post(url=self.tsdbUri, data=json.dumps(tsdbMetrics), headers={'content-type': 'application/json'})
234
			if response.status_code != 200:
235
				print('return code '+str(response.status_code)+\
236
					'. aborting. info: '+response.text)
237
				exit(1)
238

    
239
	def toTsdbJson(self, genericJson):
240
		tsdbJson = deepcopy(self.tsdbJsonTemplate)
241
		tsdbJson['metric'] = genericJson['metric']
242
		tsdbJson['value'] = genericJson['value']
243
		tsdbJson['timestamp'] = genericJson['timestamp']
244
		for key in genericJson['fields'].keys():
245
			tsdbJson['tags'][key] = genericJson['fields'][key]
246
		for key in genericJson['tags'].keys():
247
			tsdbJson['tags'][key] = genericJson['tags'][key]
248
		return tsdbJson
249

    
250
#******************************************************************************
251
def usage():
252
	print "usage: "+sys.argv[0]+" --username=<user name> --hub=<home hub label> [--no-ndncon, --no-nfd]"
253
	print ""
254
	print "the tool allows to track CPU and memory resources consumed by ndncon and/or NFD and ingest "
255
	print "this data into remote data base for real-time and historical analysis"
256
	print ""
257
	print "\texample:"
258
	print "\t\t"+sys.argv[0]+" --username=peter --hub=remap"
259
	print "\toptions:"
260
	print "\t\t--no-nfd:\ttrack ndncon resources only"
261
	print "\t\t--no-ndncon:\ttrack NFD resources only"
262
	print "\texamples:"
263
	print "\t\ttrack CPU and memory consumption from ndncon only:"
264
	print "\t\t\t"+sys.argv[0]+" --username=peter --hub=remap --no-nfd"
265
	print "\t\ttrack CPU and memory consumption from NFD only:"
266
	print "\t\t\t"+sys.argv[0]+" --username=peter --hub=remap --no-ndncon"
267

    
268
def main():
269
	global resourcesToTrack
270

    
271
	try:
272
		opts, args = getopt.getopt(sys.argv[1:], "", ["username=", "hub=", "no-ndncon", "no-nfd", "dry-run", "iuser=", "ipassword=", "idb=", "influx-adaptor", "port=", "host="])
273
	except getopt.GetoptError as err:
274
		print str(err)
275
		usage()
276
		exit(2)
277

    
278
	# ingestion parameters
279
	port = None
280
	host = 'localhost'
281
	tsdbPort = 4242
282
	influxPort = 8086
283
	useTsdbAdaptor = True
284
	dryRun = False
285
	influx_username = None
286
	influx_password = None
287
	influx_dataBaseName = None
288

    
289
	username = None
290
	hubLabel = None
291
	trackNdncon = True
292
	trackNfd = True
293
	for o, a in opts:
294
		if o in ("--username"):
295
			username = a
296
		elif o in ("--hub"):
297
			hubLabel = a
298
		elif o in ("--no-ndncon"):
299
			trackNdncon = False
300
		elif o in ("--no-nfd"):
301
			trackNfd = False
302
		elif o in ("--dry-run"):
303
			dryRun = True
304
		elif o in ("--port"):
305
			port = int(a)
306
		elif o in ("--host"):
307
			host = a
308
		elif o in ("--influx-adaptor"):
309
			useTsdbAdaptor = False
310
		elif o in ("--iuser"):
311
			influx_username = a
312
		elif o in ("--ipassword"):
313
			influx_password = a
314
		elif o in ("--idb"):
315
			influx_dataBaseName = a
316
		else:
317
			assert False, "unhandled option "+o
318
	if not (username and hubLabel):
319
		usage()
320
		exit(2)
321

    
322
	if not useTsdbAdaptor and not (influx_username and influx_password and influx_dataBaseName):
323
		print influx_username, influx_password, influx_dataBaseName
324
		print "asked for influx adaptor, but didn't provide username, password and db name. aborting"
325
		exit(2)
326

    
327
	if useTsdbAdaptor:
328
		if port: tsdbPort = port
329
		ingestAdaptor = TsdbAdaptor(timeOffset=0, tsdbUri='http://{0}:{1}/api/put?details'.format(host, tsdbPort))
330
	else:
331
		if port: influxPort = port
332
		ingestAdaptor = InfluxAdaptor(user=influx_username, password=influx_password, dbname=influx_dataBaseName, timeOffset=0, host=host, port=influxPort)
333

    
334
	ingestAdaptor.batchSize = len(resourcesToTrack)
335
	ingestAdaptor.dryRun = dryRun
336

    
337
	if trackNdncon or trackNfd:
338
		run(username, hubLabel, trackNdncon, trackNfd, ingestAdaptor)
339

    
340
if __name__ == '__main__':
341
	main()
(1-1/4)