import csv import re import os import argparse import json import time import datetime import requests import asyncio import logging import logging.handlers #from memory_profiler import profile #from pympler.tracker import SummaryTracker def follow_file(fname, cnt): """ Tail a given file and returns each line to the generator caller """ cur_file = open(fname, "r") cur_ino = os.fstat(cur_file.fileno()).st_ino while True: while True: line = cur_file.readline() cnt = cnt + 1 if not line: break yield cnt, line try: if os.stat(fname).st_ino != cur_ino: new_file = open(fname, "r") cur_file.close() cur_file = new_file cur_ino = os.fstat(cur_file.fileno()).st_ino continue except IOError: logging.warning("{} IOError".format(fname)) pass time.sleep(WAIT_FILE_SEC) def process_line(line, idx): """ Process one line at time to create a REST payload message """ ev_dict = {} line = line.replace('\"', '') pat = r"(\w+=.*?)," fields = re.findall(pat, line, flags=0) for field in fields: try: key, value = field.split('=',1) if (key in ev_dict): next else: ev_dict[key] = value except: next try: ev_host = ev_dict['entity'].split('-')[-1] ev_event = ev_dict['event'] ev_severity = ev_dict['eventSeverity'] ev_time = ev_dict['time'] ev_time = ev_time.replace(']', '') ev_type = ev_dict['eventType'] ev_description = ev_dict['description'] ev_id = "{}-{}".format(line.split(' ')[0], idx) ev_entity = ev_dict['entity'] ev_pub = ev_dict['publisher'] ev_state = ev_dict['state'] except: logging.warning(line) return None payload = { 'app_key': APP_KEY, 'status': ev_severity, 'host': ev_host, 'timestamp': ev_time, 'check' : ev_event, 'description': ev_description, 'my_unique_attribute': ev_id, 'entity' : ev_entity, 'publisher' : ev_pub, 'state' : ev_state, 'supportcard' : ev_supportcard, 'wiki' : ev_wiki } return payload #@profile def send_payload(payload, idx): """ Send a single data payload to via REST API """ url = "https://bpcom/nom/dev/events" #ret_msg = {} st = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") response = requests.post(url, headers=HEADERS, data=json.dumps(payload)) et = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") logging.info("{} {} {} {} {}".format(idx, payload['my_unique_attribute'], response.status_code, st, et)) #return ret_msg if __name__ == '__main__': parser = argparse.ArgumentParser(description= "feeder") parser.add_argument("--test", "-t", action='store_true', help='test') parser.add_argument("--infile", "-i", type=str, help="manual input file") parser.add_argument("--start", "-s", type=int, help="start sending message at #") parser.add_argument("--feed", "-f", type=str, help='FEED: \n") args = parser.parse_args() if (args.infile is not None): amber_file = args.infile logfile = "amber_panda.log" else: amber_feed = args.feed if (amber_feed is None): amber_feed = 'sentinel' amber_file = AMBERFILES[amber_feed] logfile = LOGFILES[amber_feed] if (args.start is not None): start_msg_idx = args.start else: start_msg_idx = 0 logging.basicConfig(filename=logfile, level=logging.INFO, format='%(asctime)s %(levelname)-8s %(message)s', datefmt ='%Y-%m-%d %H:%M:%S') logging.info("Starts") i = 0 loop = asyncio.get_event_loop() for i, ev_line in follow_file(amber_file, i): payload = process_line(ev_line, i) if (payload is not None and i >= start_msg_idx): future = loop.run_in_executor(None, send_payload, payload, i) logging.info("Parsing AMBER Message {}".format(i)) else: if (payload is None): logging.warning("{} Invalid Payload".format(i)) else: if (i % 100 == 0): logging.info("Skip {}... Messages".format(i)) if (args.test): if (i > TEST_MSG_CNT): break del payload time_end_str = datetime.datetime.now().strftime("%m/%d/%Y %H:%M:%S") logging.info("End At " + time_end_str)
Möchten Sie kostenlos Ihre eigenen Notizen mit GoConqr erstellen? Mehr erfahren.