#. -*- coding: utf-8 # # vdc #import site packages import base64 import json import re import datetime import random import netaddr #import local packages import utils import plugin import ioconfig import clientsdb logger = ioconfig.logger config = ioconfig.parser def sync(cached=True): """ calls slave objects and mix their nodes in a common cluster grid """ a = datetime.datetime.now() grid_vmid_min = config.get('general', 'vmid_min') grid_vmid_max = config.get('general', 'vmid_max') real_grid = {'name':'real', "vmid_min":grid_vmid_min, "vmid_max":grid_vmid_max } cache_grid = {'name':'cache', "vmid_min":grid_vmid_min, "vmid_max":grid_vmid_max } regionselector = [i for i, x in enumerate(config.sections()) if re.match(r'\W*' + 'region' + r'\W*', x)] for ri in range(len(regionselector)): region_section = config.sections()[int(regionselector[ri])] region_id = region_section.split("_")[1] region_name = config.get(region_section, 'name') region_range_min = config.get(region_section, 'ipv4_min') region_range_max = config.get(region_section, 'ipv4_max') slaveselector = [i for i, x in enumerate(config.sections()) if re.match(r'\W*' + 'slave' + r'\W*', x)] real_region = { "id":region_id, "region":region_name, "ipv4_min":region_range_min, "ipv4_max":region_range_max } cache_region = real_region.copy() #same region both in cache nad status for si in range(len(slaveselector)): slave_section = config.sections()[int(slaveselector[si])] slave_id = slave_section.split("_")[1] slave_name = config.get(slave_section, 'name') slave_masterip = config.get(slave_section, 'masterip') slave_password = config.get(slave_section, 'password') slave_regionid = config.get(slave_section, 'regionid') enc_slave_password = base64.b64encode(slave_password.encode('ascii')) #encode base64 to avoid shoulder surfers decoded_password = enc_slave_password.decode('utf-8') real_slave = { "id":slave_id, "slave":slave_name, "masterip":slave_masterip, "password":decoded_password } optional_slave = {} cache_file = 'cache-slave-' + slave_id + '.json' prefix = 'cache> [' + slave_id + '] ' # check if slave is in current region and include it in current dict if it is if slave_regionid == region_id: try: #trying to connect to slave host #vmlist = plugin.vmlist(slave_id, slave_masterip, enc_slave_password.decode('utf-8')) proxobject = plugin.auth(slave_id, slave_masterip, enc_slave_password) vmlist = plugin.vmlist(proxobject) real_slave['alive'] = 'up' logger.info(prefix + 'is up') except: #raise #slave cant be read so it will be marked down. real_slave['alive'] = 'down' logger.warning(prefix + 'is down') if real_slave['alive'] == 'up': #populate grid with vms then for vm in vmlist: #static parameters that CAN go to to cache: vm_id = vm['vmid'] vm_name = vm['name'] vm_owner = clientsdb.vmowner(vm_id, vm_name, cached) #read clientsdb cache static_vm = { "vmid":str(vm_id), "hostname":vm_name, 'type':vm['vmtype'], 'owner':vm_owner } real_slave[str(vm_id)] = static_vm #dynamic parameters that SHOULD NOT go to the cache: dynamic_vm = { "uptime":vm['uptime'] } optional_slave[str(vm_id)] = dynamic_vm #check current cache cache_slave = real_slave.copy() #fallback to the current state try: with open(cache_file) as fcr: cache_slave = json.load(fcr) fcr.close() except: logger.info(prefix + 'does not exist in cache. Initializing...') if cache_slave['alive'] == 'up': #slave was not down so it must be up... cache_slave = update_cache(real_slave, cache_file, prefix, 'up') logger.info(prefix + 'sync success o/') else: #if the slave was down before, compare the state before overwriting the cache cache_slave['alive'] = 'up' #even if alive status in cache is still down we ignore it by forcing it to up logger.info(prefix + 'was down') #show the differences in log for manual (or maybe automatic at some point fixing) findDiff(cache_slave, real_slave) if cache_slave != real_slave: logger.warning(prefix + 'cache != current status. please restore host!') cache_slave = update_cache(cache_slave, cache_file, prefix, 'down') else: logger.info(prefix + 'cache == current status. host restored. o/') cache_slave = update_cache(cache_slave, cache_file, prefix, 'up') #what to do with cache if host is down if real_slave['alive'] == 'down': try: logger.warning(prefix + 'loading cache...') with open(cache_file) as fscr: cache_slave = json.load(fscr) fscr.close() logger.warning(prefix + '...done') cache_slave = update_cache(cache_slave, cache_file, prefix, 'down') except: logger.error(prefix + 'sync failure!') cache_slave = real_slave.copy() #raise #we safely mix the dynamic ids now that we dont deal with cache anymore mergedslave = utils.dict_merge({}, real_slave, optional_slave) real_region[slave_id] = mergedslave cache_region[slave_id] = cache_slave #the region is finally included in the grid real_grid[region_id] = real_region cache_grid[region_id] = cache_region b = datetime.datetime.now() real_grid["synctime"] = str(b-a) #dump all data to json WriteCache(cache_grid, 'grid-cache.json') WriteCache(real_grid, 'grid-real.json') if cached == True: return cache_grid else: return real_grid def update_cache(cachedata, cachefile, prefix, newstatus): """ update administravite status """ cachedata['alive'] = newstatus WriteCache(cachedata, cachefile) #TODO send mail logger.info(prefix + 'administratively ' + newstatus) return cachedata def WriteCache(src_data, cache_file): with open(cache_file, 'w') as fcw: json.dump(src_data, fcw) fcw.close() def query_region(region_name): """ translate region name to region id """ grid_data = readcache() all_regions = [] for element in grid_data: try: if str(element) == grid_data[element]['id']: all_regions.append(element) except: continue for region in all_regions: if grid_data[region]['region'] == region_name: logger.info('region[{}]> found: id={}'.format(region_name, region)) return grid_data[region]['id'] break logger.error('grid> cant find region ' + region_name) return "-1" def query_happiness(region_id): """ analyzes grid data for the reuqested region and returns proposed slave_id, based on a "happiness" factor. happiness means alive and free :) """ grid_data = readcache() grid_data = grid_data[str(region_id)] all_slaves = [] for element in grid_data: try: if str(element) == grid_data[element]['id']: all_slaves.append(element) except: continue all_slaves = [ int(x) for x in all_slaves ] #convert values from str to int alive_slaves = [] for slaveid in all_slaves: if str(grid_data[str(slaveid)]['alive']) == 'up': alive_slaves.append(slaveid) logger.info('grid> alive slaves ' + str(alive_slaves)) #happy_slave = random.choice(alive_slaves) if len(alive_slaves) < 1: logger.error('grid> grid is full. add more slaves') else: happy_slave = 1 #TODO: analyze slaves and make informed decision. logger.info('grid> ' + str(happy_slave) + ' selected') return happy_slave def generate_ipv4(region_id, how_many=1): """ analyzes cached grid data and returns ip addresses for new machines. """ grid_data = readcache() ip_range_min = grid_data[str(region_id)]['ipv4_min'] ip_range_max = grid_data[str(region_id)]['ipv4_max'] region_ipset = netaddr.IPSet(netaddr.IPRange(ip_range_min, ip_range_max)) region_ips = [] for ip in region_ipset: region_ips.append(ip) ip_min = 0 ip_max = len(region_ips) - 1 tested_ips = [] #initialize ip cache requested_ips = [] all_ips = utils.find_rec(grid_data, 'ipaddr') for ips in range(int(how_many)): counter = 0 while True: if counter == 50: logger.error('grid> ip range full') return None else: counter += 1 requested_ip_index = random.randint(ip_min, ip_max) requested_ip = str(region_ips[requested_ip_index]) if requested_ip in tested_ips: logger.warning('grid> ip address ' + str(requested_ip) + ' already tested. cache: ' + str(tested_ips)) continue if requested_ip in requested_ips: logger.warning('grid> ip address ' + str(requested_ip) + ' already generated') tested_ips.append(requested_ip) continue if requested_ip in all_ips: position = used_ips.index(requested_ip) logger.warning('grid> ip address ' + str(requested_ip) + ' already exist. location:' + str(position)) tested_ips.append(requested_ip) continue else: tested_ips = [] #clear ip cache break logger.info('grid> ip address ' + requested_ip + ' selected') requested_ips.append(requested_ip) logger.info('grid> ip addresses ' + str(requested_ips) + ' selected') return requested_ips def generate_vmid(): """ analyzes cached grid data and return proposed vmid for new machines """ grid_data = readcache() tested_vmids = [] #initialize id cache id_min = grid_data['vmid_min'] id_max = grid_data['vmid_max'] all_vmid = utils.find_rec(grid_data, 'vmid') #get all vmid values from the nested grid all_vmid = [ int(x) for x in all_vmid ] #convert values from str to int (its a vmid right?) counter = 0 while True: if counter == 50: logger.error('grid> ip range full') return None else: counter += 1 requested_vmid = random.randint(int(id_min), int(id_max)) #max 90k machines if requested_vmid in tested_vmids: logger.warning('grid> vmid ' + str(requested_vmid) + ' already tested. cache:' + str(tested_vmids)) continue if requested_vmid in all_vmid: position = all_vmid.index(requested_vmid) logger.warning('grid> vmid ' + str(requested_vmid) + ' already exist. location:' + str(position)) tested_vmids.append(requested_vmid) else: tested_vmids = [] #clear tested vmid cache break logger.info('grid> vmid ' + str(requested_vmid) + ' selected') return requested_vmid def query_slave_data(slave_id): """ read the cache for the requested slave_id """ grid_data = readcache() all_regions = [] for element in grid_data: try: if str(element) == grid_data[element]['id']: all_regions.append(element) except: continue try: for region in all_regions: cslave = grid_data[region] result_slave = cslave[str(slave_id)] #select the slave from all regions if result_slave != None: return result_slave break except: raise return {} def query_vm(req_vmid): """ returns slave_id and vm_type for the requested vmid """ #read current state (no cache) sync(False) grid_data = readreal() #search for the requested vmid #TODO: maybe we should also check if the owner has permissions to manage it, ###### if for some reason the admin panel is compromised. all_vmid = utils.find_rec(grid_data, 'vmid') target = int(0) for running_vmid in all_vmid: if str(req_vmid) == str(running_vmid): target = running_vmid break else: continue if target == 0: logger.error('grid> vmid {} cannot be found!' + str(req_vmid)) return int(-1), "None" path = utils.get_path(grid_data, target) region_id = path[0] slave_id = path[1] try: vm_type = grid_data[str(region_id)][str(slave_id)][str(target)]['type'] except: logger.error('vm[{}]> type is unknown!'.format(vm_id)) raise logger.info('vm[{}]> type {} found. path=region={} found.'.format(target, vm_type, str(path))) return slave_id, vm_type def findDiff(d1, d2, path=""): for k in d1.keys(): if not k in d2.keys(): logger.warning('cache> ' + str(k) + ' as key not in d2') else: if type(d1[k]) is dict: if path == "": path = k else: path = path + "->" + k findDiff(d1[k],d2[k], path) else: if d1[k] != d2[k]: logger.warning('cache> ' + str(k) + ' ' + str(d1[k]) + ' [-]') logger.warning('cache> ' + str(k) + ' ' + str(d2[k]) + ' [+]') def readcache(): """ read the last saved cache and return its contents """ try: with open('grid-cache.json') as gridfile: grid_data = json.load(gridfile) gridfile.close() except: grid_data = {} logger.error('cache> cannot read cache file') return grid_data def readreal(): """ read the current state and return its contents """ try: with open('grid-real.json') as gridfile: grid_data = json.load(gridfile) gridfile.close() resulttime = grid_data['synctime'] logger.info('grid> sync for ' + resulttime) except: grid_data = {} logger.error('cache> cannot read temp file') return grid_data if __name__ == '__main__': #print(sync()) #print(query_region('Plovdiv, Bulgaria')) #print(query_happiness(0)) #print(generate_ipv4(0,3)) #print(generate_vmid()) #print(query_slave_data(0)) print(query_vm(483039)) print(query_vm(147344))