commit ff55f10a9533538010ece6170ce462d9085d671f Author: Daniel afx Date: Mon Feb 15 12:30:43 2016 +0200 First. diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f54d41a --- /dev/null +++ b/.gitignore @@ -0,0 +1,66 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +#proxmaster custom ignores + +log/ +config.ini +*.json + diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..3ee9ff6 --- /dev/null +++ b/LICENSE @@ -0,0 +1,18 @@ +Copyright (c) 2015 deflax.net + +This software is provided 'as-is', without any express or implied +warranty. In no event will the authors be held liable for any damages +arising from the use of this software. + +Permission is granted to anyone to use this software for any purpose, +including commercial applications, and to alter it and redistribute it +freely, subject to the following restrictions: + +1. The origin of this software must not be misrepresented; you must not + claim that you wrote the original software. If you use this software + in a product, an acknowledgement in the product documentation would be + appreciated but is not required. +2. Altered source versions must be plainly marked as such, and must not be + misrepresented as being the original software. +3. This notice may not be removed or altered from any source distribution. + diff --git a/README.md b/README.md new file mode 100644 index 0000000..ddc790e --- /dev/null +++ b/README.md @@ -0,0 +1,13 @@ +#Proxmaster +Python RESTful API for managing a grid of vm slaves + +##Installation instructions: +``` +1. sudo pip3 install -r requirements.txt +2. create config.ini with the following format: +3. chmod +x start.sh +4. create nginx vhost via the provided template files: +- config.ini.dist +- nginx_example_vhost.txt +5. o/ +``` diff --git a/clientsdb.py b/clientsdb.py new file mode 100644 index 0000000..1d466ec --- /dev/null +++ b/clientsdb.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 +# +# manage clientsdb.json + +#import site packages +import json + +#import local packages +import ioconfig +import utils + +def addclient(vmid, vmname, clientid, clientname): + """ add new client to the clientsdb.json """ + clientsdb = readclientsdb() + if str(clientid) in clientsdb: + ioconfig.logger.info('clients> client ' + clientid + ' already exists. merging.') + else: + ioconfig.logger.info('clients> client ' + clientid + ' does not exist. creating.') + vcard = { 'name':str(clientname) } + newclient = { str(clientid):vcard } + clientsdb.update(newclient) + + ioconfig.logger.info('clients> vmid ' + vmid + ' will be owned by ' + clientid + ' (' + clientname + ')') + vmdata = { 'name':str(vmname), 'vmid':str(vmid), 'ownerid':str(clientid) } + clientsdb[str(clientid)][str(vmid)] = vmdata + writeclientsdb(clientsdb) + + +def vmowner(vmid, vmname, verbose): + """ find the owner of the vm """ + clientsdb = readclientsdb() + try: + clientid = utils.get_rec(clientsdb, str(vmid))[0]['ownerid'] + clientname = clientsdb[str(clientid)]['name'] + except: + raise + clientid = '0' #unknown owner + clientname = 'unknown' + if verbose: + ioconfig.logger.info('clients> the owner of ' + str(vmid) + ' (' + vmname + ') is ' + str(clientid) + ' (' + clientname + ')') + return clientid + + +def readclientsdb(): + """ read client db """ + try: + with open('clients.json') as dbr: + clientsdb = json.load(dbr) + dbr.close() + except: + clientsdb = {} + ioconfig.logger.warning('clients> initializing...') + #writeclientsdb(clientsdb) + return clientsdb + + +def writeclientsdb(clientsdb): + """ write db """ + with open('clients.json', 'w') as dbw: + json.dump(clientsdb, dbw) + dbw.close() + diff --git a/config.ini.dist b/config.ini.dist new file mode 100644 index 0000000..1ac6eab --- /dev/null +++ b/config.ini.dist @@ -0,0 +1,28 @@ +[uwsgi] +socket = 127.0.0.1:5117 +workers = 3 + +[general] +logfile = log/proxmaster.log +adminuser = masteradmin@pve +apipass = CHANGEME +vmid_min = 1000 +vmid_max = 999999 +novnc_url = http://FQDN/noVNC + +[region_0] +name = CHANGEME +ipv4_min = 192.168.0.4 +ipv4_max = 192.168.0.254 + +[slave_0] +name = CHANGEME +masterip = 192.168.0.2 +password = CHANGEME +regionid = 0 + +[slave_1] +name = CHANGEME +masterip = 192.168.0.3 +password = CHANGEME +regionid = 0 diff --git a/grid.py b/grid.py new file mode 100644 index 0000000..7acb20b --- /dev/null +++ b/grid.py @@ -0,0 +1,396 @@ +#. -*- coding: utf-8 +# +# vdc + +#import site packages +import base64 +import json +import re +import datetime +import random +import netaddr + +#import local packages +import utils +import plugin +import ioconfig +import clientsdb +import journaldb + +logger = ioconfig.logger +config = ioconfig.parser + + +def sync(cached=True): + """ calls slave objects and mix their nodes in a common cluster grid """ + a = datetime.datetime.now() + grid_vmid_min = config.get('general', 'vmid_min') + grid_vmid_max = config.get('general', 'vmid_max') + real_grid = {'name':'real', "vmid_min":grid_vmid_min, "vmid_max":grid_vmid_max } + cache_grid = {'name':'cache', "vmid_min":grid_vmid_min, "vmid_max":grid_vmid_max } + regionselector = [i for i, x in enumerate(config.sections()) if re.match(r'\W*' + 'region' + r'\W*', x)] + for ri in range(len(regionselector)): + region_section = config.sections()[int(regionselector[ri])] + region_id = region_section.split("_")[1] + region_name = config.get(region_section, 'name') + region_range_min = config.get(region_section, 'ipv4_min') + region_range_max = config.get(region_section, 'ipv4_max') + slaveselector = [i for i, x in enumerate(config.sections()) if re.match(r'\W*' + 'slave' + r'\W*', x)] + real_region = { "id":region_id, "region":region_name, "ipv4_min":region_range_min, "ipv4_max":region_range_max } + cache_region = real_region.copy() #same region both in cache nad status + + for si in range(len(slaveselector)): + slave_section = config.sections()[int(slaveselector[si])] + slave_id = slave_section.split("_")[1] + slave_name = config.get(slave_section, 'name') + slave_masterip = config.get(slave_section, 'masterip') + slave_password = config.get(slave_section, 'password') + slave_regionid = config.get(slave_section, 'regionid') + enc_slave_password = base64.b64encode(slave_password.encode('ascii')) #encode base64 to avoid shoulder surfers + decoded_password = enc_slave_password.decode('utf-8') + real_slave = { "id":slave_id, "slave":slave_name, "masterip":slave_masterip, "password":decoded_password } + optional_slave = {} + cache_file = 'cache-slave-' + slave_id + '.json' + prefix = 'cache> [' + slave_id + '] ' + + # check if slave is in current region and include it in current dict if it is + if slave_regionid == region_id: + try: + #trying to connect to slave host + #vmlist = plugin.vmlist(slave_id, slave_masterip, enc_slave_password.decode('utf-8')) + proxobject = plugin.auth(slave_id, slave_masterip, enc_slave_password) + vmlist = plugin.vmlist(proxobject) + real_slave['alive'] = 'up' + logger.info(prefix + 'is up') + except: + #raise + #slave cant be read so it will be marked down. + real_slave['alive'] = 'down' + logger.warning(prefix + 'is down') + + if real_slave['alive'] == 'up': + #populate grid with vms then + for vm in vmlist: + #static parameters that CAN go to to cache: + vm_id = vm['vmid'] + vm_name = vm['name'] + vm_owner = clientsdb.vmowner(vm_id, vm_name, cached) #read clientsdb cache + static_vm = { "vmid":str(vm_id), "hostname":vm_name, 'type':vm['vmtype'], 'owner':vm_owner } + real_slave[str(vm_id)] = static_vm + #dynamic parameters that SHOULD NOT go to the cache: + dynamic_vm = { "uptime":vm['uptime'] } + optional_slave[str(vm_id)] = dynamic_vm + + #check current cache + cache_slave = real_slave.copy() #fallback to the current state + try: + with open(cache_file) as fcr: + cache_slave = json.load(fcr) + fcr.close() + except: + logger.info(prefix + 'does not exist in cache. Initializing...') + + if cache_slave['alive'] == 'up': + #slave was not down so it must be up... + cache_slave = update_cache(real_slave, cache_file, prefix, 'up') + logger.info(prefix + 'sync success o/') + else: + #if the slave was down before, compare the state before overwriting the cache + cache_slave['alive'] = 'up' #even if alive status in cache is still down we ignore it by forcing it to up + logger.info(prefix + 'was down') + #show the differences in log for manual (or maybe automatic at some point fixing) + findDiff(cache_slave, real_slave) + if cache_slave != real_slave: + logger.warning(prefix + 'cache != current status. please restore host!') + cache_slave = update_cache(cache_slave, cache_file, prefix, 'down') + else: + logger.info(prefix + 'cache == current status. host restored. o/') + cache_slave = update_cache(cache_slave, cache_file, prefix, 'up') + + #what to do with cache if host is down + if real_slave['alive'] == 'down': + try: + logger.warning(prefix + 'loading cache...') + with open(cache_file) as fscr: + cache_slave = json.load(fscr) + fscr.close() + logger.warning(prefix + '...done') + cache_slave = update_cache(cache_slave, cache_file, prefix, 'down') + except: + logger.error(prefix + 'sync failure!') + cache_slave = real_slave.copy() + #raise + + #we safely mix the dynamic ids now that we dont deal with cache anymore + mergedslave = utils.dict_merge({}, real_slave, optional_slave) + real_region[slave_id] = mergedslave + cache_region[slave_id] = cache_slave + #the region is finally included in the grid + real_grid[region_id] = real_region + cache_grid[region_id] = cache_region + + b = datetime.datetime.now() + real_grid["synctime"] = str(b-a) + #dump all data to json + WriteCache(cache_grid, 'grid-cache.json') + WriteCache(real_grid, 'grid-real.json') + if cached == True: + return cache_grid + else: + return real_grid + + +def update_cache(cachedata, cachefile, prefix, newstatus): + """ update administravite status """ + cachedata['alive'] = newstatus + WriteCache(cachedata, cachefile) + #TODO send mail + logger.info(prefix + 'administratively ' + newstatus) + return cachedata + + +def WriteCache(src_data, cache_file): + with open(cache_file, 'w') as fcw: + json.dump(src_data, fcw) + fcw.close() + + +def query_region(region_name): + """ translate region name to region id """ + grid_data = readcache() + + all_regions = [] + for element in grid_data: + try: + if str(element) == grid_data[element]['id']: + all_regions.append(element) + except: + continue + + for region in all_regions: + if grid_data[region]['region'] == region_name: + logger.info('grid> region ' + region_name + ' found') + return grid_data[region]['id'] + break + logger.error('grid> cant find region ' + region_name) + return "-1" + +def query_happiness(region_id): + """ analyzes grid data for the reuqested region and returns proposed slave_id, + based on a "happiness" factor. happiness means alive and free :) """ + grid_data = readcache() + grid_data = grid_data[str(region_id)] + + all_slaves = [] + for element in grid_data: + try: + if str(element) == grid_data[element]['id']: + all_slaves.append(element) + except: + continue + all_slaves = [ int(x) for x in all_slaves ] #convert values from str to int + + alive_slaves = [] + for slaveid in all_slaves: + if str(grid_data[str(slaveid)]['alive']) == 'up': + alive_slaves.append(slaveid) + logger.info('grid> alive slaves ' + str(alive_slaves)) + + #happy_slave = random.choice(alive_slaves) + if len(alive_slaves) < 1: + logger.error('grid> grid is full. add more slaves') + else: + happy_slave = 1 #TODO: analyze slaves and make informed decision. + logger.info('grid> ' + str(happy_slave) + ' selected') + return happy_slave + + +def generate_ipv4(region_id, how_many=1): + """ analyzes cached grid data and returns ip addresses for new machines. """ + grid_data = readcache() + ip_range_min = grid_data[str(region_id)]['ipv4_min'] + ip_range_max = grid_data[str(region_id)]['ipv4_max'] + region_ipset = netaddr.IPSet(netaddr.IPRange(ip_range_min, ip_range_max)) + region_ips = [] + for ip in region_ipset: + region_ips.append(ip) + ip_min = 0 + ip_max = len(region_ips) - 1 + tested_ips = [] #initialize ip cache + requested_ips = [] + all_ips = utils.get_rec(grid_data, 'ipaddr') + + for ips in range(int(how_many)): + counter = 0 + while True: + if counter == 50: + logger.error('grid> ip range full') + return None + else: + counter += 1 + + requested_ip_index = random.randint(ip_min, ip_max) + requested_ip = str(region_ips[requested_ip_index]) + + if requested_ip in tested_ips: + logger.warning('grid> ip address ' + str(requested_ip) + ' already tested. cache: ' + str(tested_ips)) + continue + + if requested_ip in requested_ips: + logger.warning('grid> ip address ' + str(requested_ip) + ' already generated') + tested_ips.append(requested_ip) + continue + + if requested_ip in all_ips: + position = used_ips.index(requested_ip) + logger.warning('grid> ip address ' + str(requested_ip) + ' already exist. location:' + str(position)) + tested_ips.append(requested_ip) + continue + else: + tested_ips = [] #clear ip cache + break + + logger.info('grid> ip address ' + requested_ip + ' selected') + requested_ips.append(requested_ip) + logger.info('grid> ip addresses ' + str(requested_ips) + ' selected') + return requested_ips + + +def generate_vmid(): + """ analyzes cached grid data and return proposed vmid for new machines """ + grid_data = readcache() + tested_vmids = [] #initialize id cache + id_min = grid_data['vmid_min'] + id_max = grid_data['vmid_max'] + all_vmid = utils.get_rec(grid_data, 'vmid') #get all vmid values from the nested grid + all_vmid = [ int(x) for x in all_vmid ] #convert values from str to int (its a vmid right?) + counter = 0 + while True: + if counter == 50: + logger.error('grid> ip range full') + return None + else: + counter += 1 + + requested_vmid = random.randint(int(id_min), int(id_max)) #max 90k machines + + if requested_vmid in tested_vmids: + logger.warning('grid> vmid ' + str(requested_vmid) + ' already tested. cache:' + str(tested_vmids)) + continue + + if requested_vmid in all_vmid: + position = all_vmid.index(requested_vmid) + logger.warning('grid> vmid ' + str(requested_vmid) + ' already exist. location:' + str(position)) + tested_vmids.append(requested_vmid) + else: + tested_vmids = [] #clear tested vmid cache + break + + logger.info('grid> vmid ' + str(requested_vmid) + ' selected') + return requested_vmid + + +def query_slave_data(slave_id): + """ read the cache for the requested slave_id """ + grid_data = readcache() + all_regions = [] + for element in grid_data: + try: + if str(element) == grid_data[element]['id']: + all_regions.append(element) + except: + continue + try: + for region in all_regions: + cslave = grid_data[region] + result_slave = cslave[str(slave_id)] #select the slave from all regions + if result_slave != None: + return result_slave + break + except: + raise + return {} + + +def query_vm(req_vmid): + """ returns slave_id and vm_type for the requested vmid """ + #read current state (no cache) + sync(False) + grid_data = readreal() + + #compare requested vmid to all vmid's from the grid + #TODO: maybe we should also check the owner somehow + all_vmid = utils.get_rec(grid_data, 'vmid') + target = int(0) + for running_vmid in all_vmid: + if str(req_vmid) == str(running_vmid): + target = req_vmid #=runn? + break + else: + continue + if target == 0: + logger.error('grid> vmid {} cannot be found.' + str(req_vmid)) + return "-1" + + region_id, slave_id = journaldb.getjnode(target) + try: + vm_type = grid_data[str(region_id)][str(slave_id)][str(target)]['type'] + except: + raise + + #we should know them by now. + return slave_id, vm_type + + +def findDiff(d1, d2, path=""): + for k in d1.keys(): + if not k in d2.keys(): + logger.warning('cache> ' + str(k) + ' as key not in d2') + else: + if type(d1[k]) is dict: + if path == "": + path = k + else: + path = path + "->" + k + findDiff(d1[k],d2[k], path) + else: + if d1[k] != d2[k]: + logger.warning('cache> ' + str(k) + ' ' + str(d1[k]) + ' [-]') + logger.warning('cache> ' + str(k) + ' ' + str(d2[k]) + ' [+]') + +def readcache(): + """ read the last saved cache and return its contents """ + try: + with open('grid-cache.json') as gridfile: + grid_data = json.load(gridfile) + gridfile.close() + except: + grid_data = {} + logger.error('cache> cannot read cache file') + return grid_data + + +def readreal(): + """ read the current state and return its contents """ + try: + with open('grid-real.json') as gridfile: + grid_data = json.load(gridfile) + gridfile.close() + resulttime = grid_data['synctime'] + logger.info('grid> sync for ' + resulttime) + except: + grid_data = {} + logger.error('cache> cannot read temp file') + return grid_data + + + +if __name__ == '__main__': + print(sync()) + #print(query_region('Plovdiv, Bulgaria')) + #print(query_happiness(0)) + #print(generate_ipv4(0,3)) + #print(generate_vmid()) + #print(query_slave_data(0)) + #print(query_vm(483039)) + diff --git a/ioconfig.py b/ioconfig.py new file mode 100644 index 0000000..60a7fc1 --- /dev/null +++ b/ioconfig.py @@ -0,0 +1,21 @@ +#. -*- coding: utf-8 +# +# ioconfig.py - read/write config/log etc. + +import configparser +import logging + +""" config reader """ +parser = configparser.ConfigParser() +parser.read('config.ini') + +""" log writer """ +logger = logging.getLogger('proxmaster') +logging.captureWarnings(True) +logger.setLevel(logging.DEBUG) +handler = logging.FileHandler(parser.get('general', 'logfile')) +handler.setLevel(logging.DEBUG) +#formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') +handler.setFormatter(formatter) +logger.addHandler(handler) diff --git a/journaldb.py b/journaldb.py new file mode 100644 index 0000000..5edc56c --- /dev/null +++ b/journaldb.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 +# +# manage journaldb.json which is a table of vmid's as indexes update on vmcreate and +# values of region_id and slave_id. should support deletion of unused id's and be +# properly updated on vm migrations + +#site +import json + +#local +import ioconfig +import utils + +def createjnode(vmid, regionid, slaveid): + """ create new record into the journal. invoked on vm creation """ + journaldb = readjournal() + if str(vmid) in journaldb: + ioconfig.logger.warning('journal> overwriting id[{}] !'.format(vmid)) + else: + jnode = { str(vmid):{} } + journaldb.update(jnode) + + ioconfig.logger.info ('journal> r[{}] s[{}] -> id[{}]'.format(regionid, slaveid, vmid)) + jdata = { 'vmid':str(vmid), 'slaveid':str(slaveid), 'regionid':str(regionid) } + journaldb[str(vmid)] = jdata + writedb(journaldb) + + +def getjnode(vmid): + """ query the database for records with requested vmid. invoked on user commands """ + journaldb = readjournal() + try: + regionid = journaldb[str(vmid)]['regionid'] + slaveid = journaldb[str(vmid)]['slaveid'] + ioconfig.logger.info('journal> read: id[{}] -> r[{}] s[{}]'.format(vmid, regionid, slaveid)) + except: + ioconfig.logger.error('journal> invalid id[{}] !'.format(vmid)) + else: + return regionid, slaveid + + +def readjournal(): + """ read journal """ + try: + with open('journal.json') as dbr: + journaldb = json.load(dbr) + dbr.close() + except: + journaldb = {} + ioconfig.logger.warning('journal> initializing...') + return journaldb + + +def writedb(journaldb): + """ write journal """ + with open('journal.json', 'w') as dbw: + json.dump(journaldb, dbw) + dbw.close() + diff --git a/nginx_example_vhost.txt b/nginx_example_vhost.txt new file mode 100644 index 0000000..32f5b5b --- /dev/null +++ b/nginx_example_vhost.txt @@ -0,0 +1,10 @@ +server { + listen 80; + server_name EXAMPLE.com; + location / { + uwsgi_pass 127.0.0.1:5117; + include uwsgi_params; + uwsgi_param UWSGI_SCRIPT proxmaster; + uwsgi_param UWSGI_PYHOME /home/USER/proxmaster; + } +} diff --git a/novnc.py b/novnc.py new file mode 100644 index 0000000..610b2aa --- /dev/null +++ b/novnc.py @@ -0,0 +1,44 @@ +#. -*- coding: utf-8 +# +# novnc daemon spawner + +#import site packages +import shlex, subprocess + + +def spawn(target, options): + """ spawn """ + vnctarget = '{}:{} {}:{} '.format(target['listen_host'], target['listen_port'], target['target_host'], target['target_port']) + + a_options = '' + for key, value in options.items(): + if value == True: + c_option = '--{} '.format(key) + else: + c_option = '--{} {} '.format(key, value) + a_options += c_option + + command_line = 'python3 runwebsockify.py ' + a_options + vnctarget + args = shlex.split(command_line) + p = subprocess.Popen(args) + print('spawn!') + + +def spawn2(options): + """ spawn novnc daemon """ + print('daemon spawned') + novncd = threading.Thread(name='novncd', target=start_websockify, args=(options,)) + novncd.setDaemon(True) #daemonic ]:> + novncd.start() + print('stauts', novncd.isAlive()) + + +def start_websockify(options): + """ spawn websockify process """ + print(options) + server = websockify.WebSocketProxy(**options) + server.start_server() + print('daemon exited') + #while True: + # print('daemon') + diff --git a/plugin.py b/plugin.py new file mode 100644 index 0000000..99e82d7 --- /dev/null +++ b/plugin.py @@ -0,0 +1,301 @@ +#. -*- coding: utf-8 - +# required proxmox permissions: PVESysAdmin, PVEVMAdmin +# +# afx 2015-2016 + +# site +from proxmoxer import ProxmoxAPI +import base64 +import json +import time +import socket + +#local +import grid +import clientsdb +import journaldb +import utils +import ioconfig +import novnc + +def auth(slave_id, masterip=None, enc_password=None): + """ captures slave we want to auth from the cache and extract the credentials """ + adminuser = ioconfig.parser.get('general', 'adminuser') + if masterip is None: + result_slave = grid.query_slave_data(slave_id) + masterip = result_slave['masterip'] + enc_password = result_slave['password'] + adminpassword = base64.b64decode(enc_password).decode('ascii') + + #vendor specific + #connection = lib_proxmoxia.Connector(masterip) + #auth_token = connection.get_auth_token(adminuser, adminpassword) + #proxobject = lib_proxmoxia.Proxmox(connection) + proxobject = ProxmoxAPI(masterip, user=adminuser, password=adminpassword, verify_ssl=False) + + return proxobject + + +def vmlist(proxobject): + """ get vmlist """ + #we keep a single node proxmoxes so node id = 0 + #slave_name = proxobject.get('cluster/status')#'name'] + slave_name = proxobject.cluster.status.get()[0]['name'] + #query_kvm = proxobject.get('nodes/%s/qemu' % slave_name) + query_kvm = proxobject.nodes(slave_name).qemu.get() + query_lxc = proxobject.nodes(slave_name).lxc.get() + for kvm_dict in query_kvm: + kvm_dict['vmtype'] = 'kvm' + for lxc_dict in query_lxc: + lxc_dict['vmtype'] = 'lxc' + vmlist = query_kvm + query_lxc #merge machine list + return vmlist + + +def vmcreate(req): + """ create vm. returns JSON with data for whmcs """ + grid.sync() + + region_id = grid.query_region(req['region']) + if region_id == "-1": + logger.error('grid> no region found') + response = 'NO REGION FOUND' + return response + + slave_id = str(grid.query_happiness(region_id)) + vm_id = str(grid.generate_vmid()) + vm_ipv4 = grid.generate_ipv4(region_id, req['vps_ipv4']) + ipv4_dict = {} + ipidx = 0 + + vm_name = req['hostname'] + client_id = req['clientid'] + client_name = req['clientname'] + + proxobject = auth(slave_id) #we dont know the ip of slave_id so we leave the auth function to find it itself. + slave_name = proxobject.cluster.status.get()[0]['name'] + + #ioconfig.logger.info('grid[' + slave_name + ']> recieved data: %s, %s, %s, %s, %s', region_id, slave_id, vm_id, vm_ipv4, req) + for ip in vm_ipv4: + ipv4_dict[str(ipidx)] = str(ip) + ipidx += 1 + response = { 'status':'CREATING', 'vmid':vm_id, 'name':vm_name, 'password':'TODO', 'ipv4_0':vm_ipv4[0] } + + disk_filename = 'vm-' + vm_id + '-disk-1' + description = vm_name + ' (' + vm_id + ')\n' + description += 'owned by ' + client_name + ' (' + client_id + ')\n' + description += 'master ip: ' + vm_ipv4[0] + + #create partition + image_name = 'vm-' + vm_id + '-disk-0' + local_storage = proxobject.nodes(slave_name).storage('lvm') + local_storage.content.post(vmid=vm_id, + filename=image_name, + size=req['vps_disk'] + 'G') + + if req['vps_type'] == 'KVM': + create_result = proxobject.nodes(slave_name).qemu.post(vmid=vm_id, + name=vm_name, + sockets=1, + cores=req['vps_cpu'], + memory=req['vps_ram'], + virtio0='lvm:' + image_name, + ide1='skyblue:iso/' + req['vps_os'] + ',media=cdrom', + net0='e1000,bridge=pub', + onboot=1, + description=description) + if req['vps_type'] == 'LXC': + create_result = proxobject.nodes(slave_name).lxc.post(vmid=vm_id, + hostname=vm_name, + password=req['password'], + sockets=1, + cores=req['vps_cpu'], + memory=req['vps_ram'], + virtio0='lvm:' + image_name, + ip_address=vm_ipv4[0], + onboot=1, + description=description) + + #populate the client db and vm journal + client_id = req['clientid'] + client_name = req['clientname'] + clientsdb.addclient(vm_id, vm_name, client_id, client_name) + journaldb.createjnode(vm_id, region_id, slave_id) + + #start the machihe + time.sleep(7) #wait few seconds for the slave to prepare the machine for initial run + vmstart(vm_id) + return response + + +def vmstatus(vm_id): + """ returns the status of the machine """ + slave_id, vm_type = grid.query_vm(vm_id) + proxobject = auth(slave_id) + vm_type = vm_type.lower() + slave_name = proxobject.cluster.status.get()[0]['name'] + ioconfig.logger.info('grid[%s]> get status of %s %s' % (slave_name, vm_type, vm_id)) + if vm_type == 'kvm': + result = proxobject.nodes(slave_name).qemu(vm_id).status.current.get() + if vm_type == 'lxc': + result = proxobject.nodes(slave_name).lxc(vm_id).status.current.get() + return result + + + +def vmstart(vm_id): + """ starts a machine """ + slave_id, vm_type = grid.query_vm(vm_id) + proxobject = auth(slave_id) + vm_type = vm_type.lower() + slave_name = proxobject.cluster.status.get()[0]['name'] + ioconfig.logger.info('grid[%s]> starting %s %s' % (slave_name, vm_type, vm_id)) + + if vm_type == 'kvm': + result = proxobject.nodes(slave_name).qemu(vm_id).status.start.post() + if vm_type == 'lxc': + result = proxobject.nodes(slave_name).lxc(vm_id).status.start.post() + #ioconfig.logger.info('grid[{}]> {}'.format(slave_name, result)) + response = { 'status':'STARTING' } + return response + + +def vmshutdown(vm_id): + """ acpi shutdown the machine.. """ + slave_id, vm_type = grid.query_vm(vm_id) + proxobject = auth(slave_id) + vm_type = vm_type.lower() + slave_name = proxobject.cluster.status.get()[0]['name'] + ioconfig.logger.info('grid[%s]> acpi shutdown %s %s' % (slave_name, vm_type, vm_id)) + + if vm_type == 'kvm': + result = proxobject.nodes(slave_name).qemu(vm_id).status.stop.post() + if vm_type == 'lxc': + result = proxobject.nodes(slave_name).lxc(vm_id).status.stop.post() + #ioconfig.logger.info('grid[{}]> {}'.format(slave_name, result)) + response = { 'status':'SHUTDOWN', 'vmid':vm_id } + return response + + +def vmstop(vm_id): + """ poweroff the machine.. """ + slave_id, vm_type = grid.query_vm(vm_id) + proxobject = auth(slave_id) + vm_type = vm_type.lower() + slave_name = proxobject.cluster.status.get()[0]['name'] + ioconfig.logger.info('grid[%s]> power off %s %s' % (slave_name, vm_type, vm_id)) + + if vm_type == 'kvm': + result = proxobject.nodes(slave_name).qemu(vm_id).status.stop.post() + if vm_type == 'lxc': + result = proxobject.nodes(slave_name).lxc(vm_id).status.stop.post() + #ioconfig.logger.info('grid[{}]> {}'.format(slave_name, result)) + response = { 'status':'STOPPING', 'vmid':vm_id } + return response + + +def vmshutdown(vm_id): + """ graceful stop """ + slave_id, vm_type = grid.query_vm(vm_id) + proxobject = auth(slave_id) + vm_type = vm_type.lower() + slave_name = proxobject.cluster.status.get()[0]['name'] + ioconfig.logger.info('grid[%s]> acpi shutdown sent to %s %s' % (slave_name, vm_type, vm_id)) + + if vm_type == 'kvm': + result = proxobject.nodes(slave_name).qemu(vm_id).status.shutdown.post() + if vm_type == 'lxc': + result = proxobject.nodes(slave_name).lxc(vm_id).status.shutdown.post() + #ioconfig.logger.info('grid[{}]> {}'.format(slave_name, result)) + response = { 'status':'ACPI SHUTDOWN', 'vmid':vm_id } + return response + + +def vmsuspend(vm_id): + """ suspend machine """ + slave_id, vm_type = grid.query_vm(vm_id) + proxobject = auth(slave_id) + vm_type = vm_type.lower() + slave_name = proxobject.cluster.status.get()[0]['name'] + ioconfig.logger.info('grid[%s]> suspending %s %s' % (slave_name, vm_type, vm_id)) + + if vm_type == 'kvm': + result = proxobject.nodes(slave_name).qemu(vm_id).status.suspend.post() + if vm_type == 'lxc': + result = proxobject.nodes(slave_name).lxc(vm_id).status.suspend.post() + #ioconfig.logger.info('grid[{}]> {}'.format(slave_name, result)) + response = { 'status':'SUSPEND', 'vmid':vm_id } + return response + + +def vmresume(vm_id): + """ resume machine """ + slave_id, vm_type = grid.query_vm(vm_id) + proxobject = auth(slave_id) + vm_type = vm_type.lower() + slave_name = proxobject.cluster.status.get()[0]['name'] + ioconfig.logger.info('grid[%s]> resuming %s %s' % (slave_name, vm_type, vm_id)) + + if vm_type == 'kvm': + result = proxobject.nodes(slave_name).qemu(vm_id).status.resume.post() + if vm_type == 'lxc': + result = proxobject.nodes(slave_name).lxc(vm_id).status.resume.post() + #ioconfig.logger.info('grid[{}]> {}'.format(slave_name, result)) + response = { 'status':'RESUME', 'vmid':vm_id } + return response + + +def vmvnc(vm_id): + """ invoke vnc ticket """ + slave_id, vm_type = grid.query_vm(vm_id) + proxobject = auth(slave_id) + vm_type = vm_type.lower() + slave_name = proxobject.cluster.status.get()[0]['name'] + ioconfig.logger.info('grid[%s]> invoking vnc ticket for %s %s' % (slave_name, vm_type, vm_id)) + + if vm_type == 'kvm': + ticket = proxobject.nodes(slave_name).qemu(vm_id).vncproxy.post(websocket=1) + #socket = proxobject.nodes(slave_name).qemu(vm_id).vncwebsocket.get(port=ticket['port'], + # vncticket=ticket['ticket']) + if vm_type == 'lxc': + ticket = proxobject.nodes(slave_name).lxc(vm_id).vncproxy.post() + #socket = proxobject.nodes(slave_name).lxc(vm_id).vncwebsocket.get(port=ticket['port'], + # vncticket=ticket['ticket']) + + slaveip = grid.query_slave_data(slave_id)['masterip'] + #slaveport = socket['port'] + slaveport = ticket['port'] + listenport = str(int(slaveport) + 1000 + (int(slave_id) * 100)) #TODO: max 100 parallel connections/slave. + myip = getmyip() + + vnc_target = { 'target_host': slaveip, + 'target_port': slaveport, + 'listen_host': myip, + 'listen_port': listenport } + + vnc_options = { 'idle-timeout': 20, + 'verbose': True, + 'run-once': True } + + novnc.spawn(vnc_target, vnc_options) + + external_url = ioconfig.parser.get('general', 'novnc_url') + prefix = external_url + "/?host=" + myip + "&port=" + listenport + "&encrypt=0&true_color=1&password=" + ioconfig.logger.info('grid[{}]> {}'.format(slave_name, prefix + ticket['ticket'])) + + response = { 'status':'VNC', 'vmid':vm_id } + return response + + +def getmyip(): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(("gmail.com",80)) + myip = s.getsockname()[0] + s.close + return myip + + +if __name__ == '__main__': + #internal module tests + time.sleep(30) + diff --git a/proxmaster.py b/proxmaster.py new file mode 100644 index 0000000..1642cef --- /dev/null +++ b/proxmaster.py @@ -0,0 +1,269 @@ +#. -*- coding: utf-8 - +# required proxmox permissions: PVESysAdmin, PVEVMAdmin +# +# afx 2015-2016 + +# import site packages +import logging +import falcon +import sys +import json +import urllib.parse + +#import local packages +import ioconfig +import grid +import plugin + +config = ioconfig.parser +logger = ioconfig.logger + +def welcome(): + """displays motd in log as welcome message""" + logger.info('###################################') + logger.info('# proxmaster ][ (c) 2015-2016 afx #') + logger.info('###################################') + +def apicheck(params): + """ compares request params for api key with the config file""" + try: + if params['apipass'] == config.get('general', 'apipass'): + status = True + response = 'OK' + else: + status = False + response = 'GET KEY DENIED' + logger.error('grid> read access denied. key mismatch') + except: + #raise + status = False + response = 'GET URL DENIED' + logger.error('grid> read access denied. url error?') + finally: + return (status, response) + +#API methods +class ClusterResource(object): + def on_get(self, req, resp): + """TEST ONLY. List cluster nodes. TEST ONLY""" + logger.info('grid> cache status') + apicheck_stat, apicheck_resp = apicheck(req.params) + if apicheck_stat: + resp.status = falcon.HTTP_200 + resp.body = str(grid.sync()) + else: + resp.status = falcon.HTTP_403 + resp.body = apicheck_resp + + def on_post(self, req, resp): + """Create a cluster node, returns array of: status, vmid, pass, ipv4, """ + logger.info('grid> create ' + str(req.params)) + apicheck_stat, apicheck_resp = apicheck(req.params) + if apicheck_stat: + resp.status = falcon.HTTP_200 + try: + resp.body = urllib.parse.urlencode(plugin.vmcreate(req.params)) + except: + logger.error('grid> create function cancelled') + raise + resp.status = falcon.HTTP_403 + response = 'CREATE ERR' + resp.body = response + else: + resp.status = falcon.HTTP_403 + resp.body = apicheck_resp + +class StatusResource(object): + def on_get(self, req, resp, vmid): + """ check vm status """ + logger.info('grid> status ' + str(vmid)) + apicheck_stat, apicheck_resp = apicheck(req.params) + if apicheck_stat: + resp.status = falcon.HTTP_200 + try: + resp.body = urllib.parse.urlencode(plugin.vmstatus(vmid)) + except: + logger.error('grid> status error') + raise + resp.status = falcon.HTTP_403 + response = 'STATUS ERR' + resp.body = response + else: + resp.status = falcon.HTTP_403 + resp.body = apicheck_resp + +class DeleteResource(object): + def on_post(self, req, resp, vmid): + """ delete machine completely""" + logger.info('grid> delete ' + str(vmid)) + apicheck_stat, apicheck_resp = apicheck(req.params) + if apicheck_stat: + try: + resp.body = urllib.parse.urlencode(plugin.vmdelete(vmid)) + except: + logger.error('grid> delete error') + raise + resp.status = falcon.HTTP_403 + response = 'DELETE ERR' + resp.body = response + else: + resp.status = falcon.HTTP_403 + resp.body = apicheck_resp + +class ArchivateResource(object): + def on_post(self, req, resp, vmid): + """ Temporary suspend the instance """ + logger.info('grid> suspend ' + str(vmid)) + apicheck_stat, apicheck_resp = apicheck(req.params) + if apicheck_stat: + resp.status = falcon.HTTP_200 + try: + resp.body = urllib.parse.urlencode(plugin.vmsuspend(vmid)) + except: + logger.error('grid> pause error') + raise + resp.status = falcon.HTTP_403 + response = 'PAUSE ERR' + resp.body = response + else: + resp.status = falcon.HTTP_403 + resp.body = apicheck_resp + +class UnArchiveResource(object): + def on_post(self, req, resp, vmid): + """ Unuspend the instance """ + logger.info('grid> resume ' + str(vmid)) + apicheck_stat, apicheck_resp = apicheck(req.params) + if apicheck_stat: + resp.status = falcon.HTTP_200 + try: + resp.body = urllib.parse.urlencode(plugin.vmresume(vmid)) + except: + logger.error('grid> resume error') + raise + resp.status = falcon.HTTP_403 + response = 'RESUME ERR' + resp.body = response + else: + resp.status = falcon.HTTP_403 + resp.body = apicheck_resp + +class StartResource(object): + def on_post(self, req, resp, vmid): + """ Start the instance """ + logger.info('grid> start ' + str(vmid)) + apicheck_stat, apicheck_resp = apicheck(req.params) + if apicheck_stat: + resp.status = falcon.HTTP_200 + try: + resp.body = urllib.parse.urlencode(plugin.vmstart(vmid)) + except: + logger.error('grid> start error') + #raise + resp.status = falcon.HTTP_403 + response = 'START ERR' + resp.body = response + else: + resp.status = falcon.HTTP_403 + resp.body = apicheck_resp + + +class ShutdownResource(object): + def on_post(self, req, resp, vmid): + """ ACPI Shutdown the instance """ + logger.info('grid> shutdown ' + str(vmid)) + apicheck_stat, apicheck_resp = apicheck(req.params) + if apicheck_stat: + resp.status = falcon.HTTP_200 + try: + resp.body = urllib.parse.urlencode(plugin.vmshutdown(vmid)) + #TODO: Try few times and then return proper status message + except: + logger.error('grid> shutdown error') + #raise + resp.status = falcon.HTTP_403 + response = 'SHUTDOWN ERR' + resp.body = response + else: + resp.status = falcon.HTTP_403 + resp.body = apicheck_resp + + +class StopResource(object): + def on_post(self, req, resp, vmid): + """ Stop the instance """ + logger.info('grid> stop ' + str(vmid)) + apicheck_stat, apicheck_resp = apicheck(req.params) + if apicheck_stat: + resp.status = falcon.HTTP_200 + try: + resp.body = urllib.parse.urlencode(plugin.vmstop(vmid)) + except: + logger.error('grid> stop error') + #raise + resp.status = falcon.HTTP_403 + response = 'STOP ERR' + resp.body = response + else: + resp.status = falcon.HTTP_403 + resp.body = apicheck_resp + +class VNCResource(object): + def on_post(self, req, resp, vmid): + """ Create a VNC link to the instance """ + apicheck_stat, apicheck_resp = apicheck(req.params) + logger.info('grid> vnc ' + str(vmid)) + if apicheck_stat: + try: + resp.status = falcon.HTTP_200 + resp.body = urllib.parse.urlencode(plugin.vmvnc(vmid)) + except: + logger.error('grid> vnc error') + raise + resp.status = falcon.HTTP_403 + response = 'VNC ERR' + resp.body = response + else: + resp.status = falcon.HTTP_403 + resp.body = apicheck_resp + + +if __name__ == '__main__': + sys.exit("invoke proxmaster via uwsgi. thanks. bye. o/") + +#setup routes +wsgi_app = api = application = falcon.API() + +#display motd +welcome() +#logger.info('grid> sync') +#grid.sync() + +# setup routes +res_cluster = ClusterResource() +api.add_route('/instance', res_cluster) + +res_status = StatusResource() +api.add_route('/instance/{vmid}', res_status) + +res_delete = DeleteResource() +api.add_route('/instance/delete/{vmid}', res_delete) + +res_archivate = ArchivateResource() +api.add_route('/instance/archivate/{vmid}', res_archivate) + +res_unarchive = UnArchiveResource() +api.add_route('/instance/unarchive/{vmid}', res_unarchive) + +res_start = StartResource() +api.add_route('/instance/start/{vmid}', res_start) + +res_shutdown = ShutdownResource() +api.add_route('/instance/shutdown/{vmid}', res_shutdown) + +res_stop = StopResource() +api.add_route('/instance/stop/{vmid}', res_stop) + +res_vnc = VNCResource() +api.add_route('/instance/vnc/{vmid}', res_vnc) + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..12df3a9 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +uwsgi +pyOpenSSL +requests +falcon +urllib +netaddr +proxmoxer +websockify diff --git a/runwebsockify.py b/runwebsockify.py new file mode 100644 index 0000000..9ad217c --- /dev/null +++ b/runwebsockify.py @@ -0,0 +1,5 @@ +#!/usr/bin/env python + +import websockify + +websockify.websocketproxy.websockify_init() diff --git a/start.sh b/start.sh new file mode 100755 index 0000000..34916c5 --- /dev/null +++ b/start.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +# Log rotation +LOG_DIR=${HOME}/proxmaster/log +LOG_FILE="${LOG_DIR}/proxmaster.log" + +mkdir -p $LOG_DIR + +TIME=`date -u +%s` + +if [ -e $LOG_FILE ] ; then + mv ${LOG_FILE} ${LOG_FILE}.${TIME} && touch ${LOG_FILE} +else + touch ${LOG_FILE} +fi + +#startuwsgi instance +uwsgi config.ini diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..aae9bc2 --- /dev/null +++ b/utils.py @@ -0,0 +1,73 @@ +#. -*- coding: utf-8 +# +# helper functions + +from copy import deepcopy +import functools + +def dict_merge(target, *args): + """ Recursively merges mutiple dicts """ + # Merge multiple dicts + if len(args) > 1: + for obj in args: + dict_merge(target, obj) + return target + + # Recursively merge dicts and set non-dict values + obj = args[0] + if not isinstance(obj, dict): + return obj + for k, v in obj.items(): + if k in target and isinstance(target[k], dict): + dict_merge(target[k], v) + else: + target[k] = deepcopy(v) + return target + +def get_rec(search_dict, field): + """ + Takes a dict with nested lists and dicts, + and searches all dicts for a key of the field + provided. + """ + fields_found = [] + + for key, value in search_dict.items(): + if key == field: + fields_found.append(value) + + elif isinstance(value, dict): + results = get_rec(value, field) + for result in results: + fields_found.append(result) + + elif isinstance(value, list): + for item in value: + if isinstance(item, dict): + more_results = get_recursively(item, field) + for another_result in more_results: + fields_found.append(another_result) + + return fields_found + + +def gen_dict_extract(key, var): + if hasattr(var,'iteritems'): + for k, v in var.iteritems(): + if k == key: + yield v + if isinstance(v, dict): + for result in gen_dict_extract(key, v): + yield result + elif isinstance(v, list): + for d in v: + for result in gen_dict_extract(key, d): + yield result + + +def chained_get(dct, *keys): + SENTRY = object() + def getter(level, key): + return 'NA' if level is SENTRY else level.get(key, SENTRY) + return functools.reduce(getter, keys, dct) +