proxmaster/grid.py
2016-05-08 15:44:19 +03:00

402 lines
15 KiB
Python

#. -*- coding: utf-8
#
# vdc
#import site packages
import base64
import json
import re
import datetime
import random
import netaddr
#import local packages
import utils
import plugin
import ioconfig
import clientsdb
logger = ioconfig.logger
config = ioconfig.parser
def sync(cached=True):
""" calls slave objects and mix their nodes in a common cluster grid """
a = datetime.datetime.now()
grid_vmid_min = config.get('general', 'vmid_min')
grid_vmid_max = config.get('general', 'vmid_max')
real_grid = {'name':'real', "vmid_min":grid_vmid_min, "vmid_max":grid_vmid_max }
cache_grid = {'name':'cache', "vmid_min":grid_vmid_min, "vmid_max":grid_vmid_max }
regionselector = [i for i, x in enumerate(config.sections()) if re.match(r'\W*' + 'region' + r'\W*', x)]
for ri in range(len(regionselector)):
region_section = config.sections()[int(regionselector[ri])]
region_id = region_section.split("_")[1]
region_name = config.get(region_section, 'name')
region_range_min = config.get(region_section, 'ipv4_min')
region_range_max = config.get(region_section, 'ipv4_max')
slaveselector = [i for i, x in enumerate(config.sections()) if re.match(r'\W*' + 'slave' + r'\W*', x)]
real_region = { "id":region_id, "region":region_name, "ipv4_min":region_range_min, "ipv4_max":region_range_max }
cache_region = real_region.copy() #same region both in cache nad status
for si in range(len(slaveselector)):
slave_section = config.sections()[int(slaveselector[si])]
slave_id = slave_section.split("_")[1]
slave_name = config.get(slave_section, 'name')
slave_masterip = config.get(slave_section, 'masterip')
slave_password = config.get(slave_section, 'password')
slave_regionid = config.get(slave_section, 'regionid')
enc_slave_password = base64.b64encode(slave_password.encode('ascii')) #encode base64 in grid
decoded_password = enc_slave_password.decode('ascii')
real_slave = { "id":slave_id, "slave":slave_name, "masterip":slave_masterip, "password":decoded_password }
optional_slave = {}
cache_file = 'cache-slave-' + slave_id + '.json'
prefix = 'slave[' + slave_name + ']> '
# check if slave is in current region and include it in current dict if it is
if slave_regionid == region_id:
try:
#trying to connect to slave host
#vmlist = plugin.vmlist(slave_id, slave_masterip, enc_slave_password.decode('utf-8'))
proxobject = plugin.auth(slave_id, slave_masterip, enc_slave_password)
vmlist = plugin.vmlist(proxobject)
real_slave['alive'] = 'up'
logger.info(prefix + 'is online')
except:
#raise
#slave cant be read so it will be marked down.
real_slave['alive'] = 'down'
logger.warning(prefix + 'is down')
if real_slave['alive'] == 'up':
#populate grid with vms then
for vm in vmlist:
#static parameters that CAN go to to cache:
vm_id = vm['vmid']
vm_name = vm['name']
vm_owner = clientsdb.vmowner(vm_id, vm_name, cached) #read clientsdb cache
static_vm = { "vmid":str(vm_id), "hostname":vm_name, 'type':vm['vmtype'], 'owner':vm_owner }
real_slave[str(vm_id)] = static_vm
#dynamic parameters that SHOULD NOT go to the cache:
dynamic_vm = { "uptime":vm['uptime'] }
optional_slave[str(vm_id)] = dynamic_vm
#check current cache
cache_slave = real_slave.copy() #fallback to the current state
try:
with open(cache_file) as fcr:
cache_slave = json.load(fcr)
fcr.close()
except:
logger.info(prefix + 'does not exist in cache. Initializing...')
if cache_slave['alive'] == 'up':
#slave was not down so it must be up...
cache_slave = update_cache(real_slave, cache_file, prefix, 'up')
logger.info(prefix + 'sync success o/')
else:
#if the slave was down before, compare the state before overwriting the cache
cache_slave['alive'] = 'up' #even if alive status in cache is still down we ignore it by forcing it to up
logger.info(prefix + 'was down')
#show the differences in log for manual (or maybe automatic at some point fixing)
findDiff(cache_slave, real_slave)
if cache_slave != real_slave:
logger.warning(prefix + 'cache != current status. please restore host!')
cache_slave = update_cache(cache_slave, cache_file, prefix, 'down')
else:
logger.info(prefix + 'cache == current status. host restored. o/')
cache_slave = update_cache(cache_slave, cache_file, prefix, 'up')
#what to do with cache if host is down
if real_slave['alive'] == 'down':
try:
logger.warning(prefix + 'loading cache...')
with open(cache_file) as fscr:
cache_slave = json.load(fscr)
fscr.close()
logger.warning(prefix + '...done')
cache_slave = update_cache(cache_slave, cache_file, prefix, 'down')
except:
logger.error(prefix + 'sync failure!')
cache_slave = real_slave.copy()
#raise
#we safely mix the dynamic ids now that we dont deal with cache anymore
mergedslave = utils.dict_merge({}, real_slave, optional_slave)
real_region[slave_id] = mergedslave
cache_region[slave_id] = cache_slave
#the region is finally included in the grid
real_grid[region_id] = real_region
cache_grid[region_id] = cache_region
b = datetime.datetime.now()
real_grid["synctime"] = str(b-a)
#dump all data to json
WriteCache(cache_grid, 'grid-cache.json')
WriteCache(real_grid, 'grid-real.json')
if cached == True:
return cache_grid
else:
return real_grid
def update_cache(cachedata, cachefile, prefix, newstatus):
""" update administravite status """
cachedata['alive'] = newstatus
WriteCache(cachedata, cachefile)
#TODO send mail
logger.info(prefix + 'administratively ' + newstatus)
return cachedata
def WriteCache(src_data, cache_file):
with open(cache_file, 'w') as fcw:
json.dump(src_data, fcw)
fcw.close()
def query_region(region_name):
""" translate region name to region id """
grid_data = readcache()
all_regions = []
for element in grid_data:
try:
if str(element) == grid_data[element]['id']:
all_regions.append(element)
except:
continue
for region in all_regions:
if grid_data[region]['region'] == region_name:
logger.info('region[{}]> region id={}'.format(region_name, region))
return grid_data[region]['id']
break
logger.error('grid> cant find region ' + region_name)
return "-1"
def query_happiness(region_id):
""" analyzes grid data for the reuqested region and returns proposed slave_id,
based on a "happiness" factor. happiness means alive and free :) """
grid_data = readcache()
grid_data = grid_data[str(region_id)]
all_slaves = []
for element in grid_data:
try:
if str(element) == grid_data[element]['id']:
all_slaves.append(element)
except:
continue
all_slaves = [ int(x) for x in all_slaves ] #convert values from str to int
alive_slaves = []
for slaveid in all_slaves:
if str(grid_data[str(slaveid)]['alive']) == 'up':
alive_slaves.append(slaveid)
logger.info('region[{}]> alive slaves {}'.format(str(region_id), str(alive_slaves)))
#happy_slave = random.choice(alive_slaves)
if len(alive_slaves) < 1:
logger.error('region[{}]> grid is full. add more slaves'.format(str(region_id)))
else:
happy_slave = 1 #TODO: analyze slaves and make informed decision.
logger.info('region[{}]> {} selected'.format(str(region_id), str(happy_slave)))
return happy_slave
def generate_ipv4(region_id, how_many=1):
""" analyzes cached grid data and returns ip addresses for new machines. """
grid_data = readcache()
ip_range_min = grid_data[str(region_id)]['ipv4_min']
ip_range_max = grid_data[str(region_id)]['ipv4_max']
region_ipset = netaddr.IPSet(netaddr.IPRange(ip_range_min, ip_range_max))
region_ips = []
for ip in region_ipset:
region_ips.append(ip)
ip_min = 0
ip_max = len(region_ips) - 1
tested_ips = [] #initialize ip cache
requested_ips = []
all_ips = utils.find_rec(grid_data, 'ipaddr')
for ips in range(int(how_many)):
counter = 0
while True:
if counter == 50:
logger.error('region[{}]> ip range full'.format(str(region_id)))
return None
else:
counter += 1
requested_ip_index = random.randint(ip_min, ip_max)
requested_ip = str(region_ips[requested_ip_index])
if requested_ip in tested_ips:
logger.warning('region[{}]> ip addres {} already tested. cache: {}'.format(str(region_id), str(requested_ip), str(tested_ips)))
continue
if requested_ip in requested_ips:
logger.warning('region[{}]> ip address {} already generated.'.format(str(region_id), str(requested_ip)))
tested_ips.append(requested_ip)
continue
if requested_ip in all_ips:
position = used_ips.index(requested_ip)
logger.warning('region[{}]> ip address {} already exist. location: {}'.format(str(region_id), str(position)))
tested_ips.append(requested_ip)
continue
else:
tested_ips = [] #clear ip cache
break
logger.info('region[{}]> ip address {} selected.'.format(str(region_id), str(requested_ip)))
requested_ips.append(requested_ip)
logger.info('region[{}]> ip addresses {} selected.'.format(str(region_id), str(requested_ips)))
return requested_ips
def generate_vmid():
""" analyzes cached grid data and return proposed vmid for new machines """
grid_data = readcache()
tested_vmids = [] #initialize id cache
id_min = grid_data['vmid_min']
id_max = grid_data['vmid_max']
all_vmid = utils.find_rec(grid_data, 'vmid') #get all vmid values from the nested grid
all_vmid = [ int(x) for x in all_vmid ] #convert values from str to int (its a vmid right?)
counter = 0
while True:
if counter == 50:
logger.error('grid> ip range full')
return None
else:
counter += 1
requested_vmid = random.randint(int(id_min), int(id_max)) #max 90k machines
if requested_vmid in tested_vmids:
logger.warning('grid> vmid ' + str(requested_vmid) + ' already tested. cache:' + str(tested_vmids))
continue
if requested_vmid in all_vmid:
position = all_vmid.index(requested_vmid)
logger.warning('grid> vmid ' + str(requested_vmid) + ' already exist. location:' + str(position))
tested_vmids.append(requested_vmid)
else:
tested_vmids = [] #clear tested vmid cache
break
logger.info('grid> vmid ' + str(requested_vmid) + ' selected')
return requested_vmid
def query_slave_data(slave_id):
""" read the cache for the requested slave_id """
grid_data = readcache()
all_regions = []
for element in grid_data:
try:
if str(element) == grid_data[element]['id']:
all_regions.append(element)
except:
continue
try:
for region in all_regions:
cslave = grid_data[region]
result_slave = cslave[str(slave_id)] #select the slave from all regions
if result_slave != None:
return result_slave
break
except:
raise
return {}
def query_vm(req_vmid):
""" returns slave_id and vm_type for the requested vmid """
#read current state (no cache)
sync(False)
grid_data = readreal()
#search for the requested vmid
#TODO: maybe we should also check if the owner has permissions to manage it,
###### if for some reason the admin panel is compromised.
all_vmid = utils.find_rec(grid_data, 'vmid')
target = int(0)
for running_vmid in all_vmid:
if str(req_vmid) == str(running_vmid):
target = running_vmid
break
else:
continue
if target == 0:
logger.error('grid> vmid {} cannot be found!'.format(req_vmid))
return int(-1), "None"
path = utils.get_path(grid_data, target)
region_id = path[0]
slave_id = path[1]
try:
vm_type = grid_data[str(region_id)][str(slave_id)][str(target)]['type']
except:
logger.error('vm[{}]> type is unknown!'.format(vm_id))
raise
#logger.info('vm[{}]> type={} path={}'.format(target, vm_type, str(path)))
return slave_id, vm_type
def findDiff(d1, d2, path=""):
for k in d1.keys():
if not k in d2.keys():
logger.warning('cache> ' + str(k) + ' as key not in d2')
else:
if type(d1[k]) is dict:
if path == "":
path = k
else:
path = path + "->" + k
findDiff(d1[k],d2[k], path)
else:
if d1[k] != d2[k]:
logger.warning('cache> ' + str(k) + ' ' + str(d1[k]) + ' [-]')
logger.warning('cache> ' + str(k) + ' ' + str(d2[k]) + ' [+]')
def readcache():
""" read the last saved cache and return its contents """
try:
with open('grid-cache.json') as gridfile:
grid_data = json.load(gridfile)
gridfile.close()
except:
grid_data = {}
logger.error('cache> cannot read cache file')
return grid_data
def readreal():
""" read the current state and return its contents """
try:
with open('grid-real.json') as gridfile:
grid_data = json.load(gridfile)
gridfile.close()
resulttime = grid_data['synctime']
logger.info('grid> sync for ' + resulttime)
except:
grid_data = {}
logger.error('cache> cannot read temp file')
return grid_data
if __name__ == '__main__':
#print(sync())
#print(query_region('Plovdiv, Bulgaria'))
#print(query_happiness(0))
#print(generate_ipv4(0,3))
#print(generate_vmid())
#print(query_slave_data(0))
print(query_vm(147344))