Commit 43100e54 authored by tpetaja1's avatar tpetaja1
Browse files

parallel worker for TVGL

parent bde2e511
Loading
Loading
Loading
Loading

ParallelTVGL.py

0 → 100644
+193 −0
Original line number Diff line number Diff line

from TVGL import TVGL
import penalty_functions as pf
import numpy as np
import multiprocessing
import time
import traceback

MAX_ITER = 5000


def mp_parallel_tvgl((thetas, z0s, z1s, z2s, u0s, u1s, u2s,
                      emp_cov_mat, lambd, beta, rho, nju,
                      indexes, out_queue, prev_pipe, next_pipe,
                      proc_index, stopping_criteria),
                     last=False):
    try:
        iteration = 0
        n = len(indexes)
        if last:
            nn = n
            end = None
        else:
            nn = n - 1
            end = -1
        thetas_pre = []
        final_thetas = {}
        dimension = np.shape(thetas[0])[0]
        while iteration < MAX_ITER:  # and stopping_criteria is False:
            """ Theta Update """
            if next_pipe is not None:
                next_pipe.send((z2s[-1], u2s[-1]))
            if prev_pipe is not None:
                received = prev_pipe.recv()
                z2s[0], u2s[0] = received
            for j, i in zip(indexes[:end], range(nn)):
                a = (z0s[i] + z1s[i] + z2s[i] - u0s[i] - u1s[i] - u2s[i])/3
                at = a.transpose()
                m = nju*(a + at)/2 - emp_cov_mat[i]
                d, q = np.linalg.eig(m)
                qt = q.transpose()
                sqrt_matrix = np.sqrt(d**2 + 4/nju*np.ones(dimension))
                diagonal = np.diag(d) + np.diag(sqrt_matrix)
                thetas[i] = np.real(
                    nju/2*np.dot(np.dot(q, diagonal), qt))
                final_thetas[j] = thetas[i]
            if prev_pipe is not None:
                prev_pipe.send(thetas[0])
            if next_pipe is not None:
                received = next_pipe.recv()
                if received is None:
                    break
                thetas[-1] = received
            """ Z0 Update """
            for i in range(nn):
                z0s[i] = pf.soft_threshold_odd(thetas[i] + u0s[i], lambd, rho)
            """ Z1-Z2 Update """
            for i in range(1, n):
                a = thetas[i] - thetas[i-1] + u2s[i] - u1s[i-1]
                e = pf.group_lasso_penalty(a, 2*beta/rho)
                summ = thetas[i] + thetas[i-1] + u2s[i] + u1s[i-1]
                z1s[i-1] = 0.5*(summ - e)
                z2s[i] = 0.5*(summ + e)
            """ U0 Update """
            for i in range(nn):
                u0s[i] = u0s[i] + thetas[i] - z0s[i]
            """ U1-U2 Update """
            for i in range(1, n):
                u1s[i-1] = u1s[i-1] + thetas[i-1] - z1s[i-1]
                u2s[i] = u2s[i] + thetas[i] - z2s[i]
            """ Check stopping criteria """
            if iteration > 0:
                fro_norm = 0
                for i in range(nn):
                    dif = thetas[i] - thetas_pre[i]
                    fro_norm += np.linalg.norm(dif)
                if fro_norm < 1e-5:
                    stopping_criteria[proc_index] = True
            if all(criteria is True for criteria in stopping_criteria):
                if next_pipe is not None:
                    next_pipe.send(None)
                if prev_pipe is not None:
                    prev_pipe.send(None)
                break
            thetas_pre = list(thetas)
            iteration += 1
        out_queue.put((final_thetas, iteration))
    except Exception as e:
        traceback.print_exc()
        raise e


class ParallelTVGL(TVGL):

    def __init__(self, filename, blocks=10,
                 lambd=20, beta=20, processes=1):
        super(ParallelTVGL, self).__init__(filename,
                                           blocks, lambd,
                                           beta, processes)
        if self.processes > self.blocks:
            self.processes = self.blocks
        self.chunk = int(np.round(self.blocks/float(self.processes)))
        self.iteration = "n/a"

    def init_algorithm(self):
        self.results = multiprocessing.JoinableQueue()
        self.pipes = [multiprocessing.Pipe() for i in range(self.processes-1)]
        self.procs = []
        stopping_criteria = multiprocessing.Manager().list()
        for i in range(self.processes):
            stopping_criteria.append(False)
        for i in range(self.processes):
            if i == 0:
                p = multiprocessing.Process(target=mp_parallel_tvgl,
                                            args=((self.thetas[self.chunk * i:self.chunk*(i+1)+1],
                                                   self.z0s[self.chunk * i:self.chunk*(i+1)+1],
                                                   self.z1s[self.chunk * i:self.chunk*(i+1)+1],
                                                   self.z2s[self.chunk * i:self.chunk*(i+1)+1],
                                                   self.u0s[self.chunk * i:self.chunk*(i+1)+1],
                                                   self.u1s[self.chunk * i:self.chunk*(i+1)+1],
                                                   self.u2s[self.chunk * i:self.chunk*(i+1)+1],
                                                   self.emp_cov_mat[self.chunk * i:self.chunk*(i+1)+1],
                                                   self.lambd,
                                                   self.beta,
                                                   self.rho,
                                                   self.nju,
                                                   range(self.chunk * i, self.chunk*(i+1)+1),
                                                   self.results,
                                                   None,
                                                   self.pipes[i][0],
                                                   i,
                                                   stopping_criteria),))
            elif i == self.processes - 1:
                p = multiprocessing.Process(target=mp_parallel_tvgl,
                                            args=((self.thetas[self.chunk * i:],
                                                   self.z0s[self.chunk * i:],
                                                   self.z1s[self.chunk * i:],
                                                   self.z2s[self.chunk * i:],
                                                   self.u0s[self.chunk * i:],
                                                   self.u1s[self.chunk * i:],
                                                   self.u2s[self.chunk * i:],
                                                   self.emp_cov_mat[self.chunk * i:],
                                                   self.lambd,
                                                   self.beta,
                                                   self.rho,
                                                   self.nju,
                                                   range(self.chunk * i, self.blocks),
                                                   self.results,
                                                   self.pipes[i-1][1],
                                                   None,
                                                   i,
                                                   stopping_criteria), True))
            else:
                p = multiprocessing.Process(target=mp_parallel_tvgl,
                                            args=((self.thetas[self.chunk * i:self.chunk*(i+1)+1],
                                                   self.z0s[self.chunk * i:self.chunk*(i+1)+1],
                                                   self.z1s[self.chunk * i:self.chunk*(i+1)+1],
                                                   self.z2s[self.chunk * i:self.chunk*(i+1)+1],
                                                   self.u0s[self.chunk * i:self.chunk*(i+1)+1],
                                                   self.u1s[self.chunk * i:self.chunk*(i+1)+1],
                                                   self.u2s[self.chunk * i:self.chunk*(i+1)+1],
                                                   self.emp_cov_mat[self.chunk * i:self.chunk*(i+1)+1],
                                                   self.lambd,
                                                   self.beta,
                                                   self.rho,
                                                   self.nju,
                                                   range(self.chunk * i, self.chunk*(i+1)+1),
                                                   self.results,
                                                   self.pipes[i-1][1],
                                                   self.pipes[i][0],
                                                   i,
                                                   stopping_criteria),))
            self.procs.append(p)

    def run_algorithm(self, max_iter=10000):
        self.init_algorithm()
        start_time = time.time()
        for p in self.procs:
            p.start()
        results = {}
        for i in range(self.processes):
            result, iteration = self.results.get()
            results.update(result)
            self.results.task_done()
        self.results.join()
        for i in results:
            self.thetas[i] = results[i]
        for p in self.procs:
            p.join()
        self.iteration = iteration
        print self.iteration
        self.run_time = '{0:.3g}'.format(time.time() - start_time)
        self.thetas = [np.round(theta, 3) for theta in self.thetas]
+12 −9
Original line number Diff line number Diff line
@@ -29,18 +29,21 @@ class SerialTVGL(TVGL):
        self.z1_z2_update()

    def z0_update(self):
        for i in range(self.blocks):
            self.z0s[i] = pf.soft_threshold_odd(self.thetas[i] + self.u0s[i],
                                                self.lambd, self.rho)
        self.z0s = [pf.soft_threshold_odd(
            self.thetas[i] + self.u0s[i], self.lambd, self.rho)
                    for i in range(self.blocks)]

    def z1_z2_update(self):
        aa = [self.thetas[i] - self.thetas[i-1] + self.u2s[i] - self.u1s[i-1]
              for i in range(1, self.blocks)]
        ee = [pf.group_lasso_penalty(a, 2*self.beta/self.rho) for a in aa]
        #ee = pf.group_lasso_penaltys(aa, 2*self.beta/self.rho)
        for i in range(1, self.blocks):
            a = self.thetas[i] - self.thetas[i-1] + self.u2s[i] - self.u1s[i-1]
            e = pf.group_lasso_penalty(a, 2*self.beta/self.rho)
            self.z1s[i-1] = 0.5*(self.thetas[i-1] + self.thetas[i]
                                 + self.u1s[i-1] + self.u2s[i]) - 0.5*e
            self.z2s[i] = 0.5*(self.thetas[i-1] + self.thetas[i]
                               + self.u1s[i-1] + self.u2s[i]) + 0.5*e
        #    a = self.thetas[i] - self.thetas[i-1] + self.u2s[i] - self.u1s[i-1]
        #    e = pf.group_lasso_penalty(a, 2*self.beta/self.rho)
            summ = self.thetas[i-1] + self.thetas[i] + self.u1s[i-1] + self.u2s[i]
            self.z1s[i-1] = 0.5*(summ - ee[i-1])
            self.z2s[i] = 0.5*(summ + ee[i-1])

    def u_update(self):
        for i in range(self.blocks):
+46 −18
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@ class TVGL(object):
        self.real_thetas = [0] * self.blocks
        self.read_data(filename)
        self.rho = self.get_rho()
        self.max_step = 0.1
        self.lambd = lambd
        self.beta = beta
        print "Rho: %s" % self.rho
@@ -78,34 +79,58 @@ class TVGL(object):
        dh = None

    def get_rho(self):
        if self.obs % 3 == 0:
            return self.obs / 3 + 1
        else:
            return np.ceil(float(self.obs) / float(3))
        #if self.obs % 3 == 0:
        #    return self.obs / 3 + 1
        #else:
        #    return np.ceil(float(self.obs) / float(3))
        return float(self.obs + 0.1) / float(3)
        #return 2*self.obs

    def adjust_rho(self):
        step = 1/float(self.nju*2)
        step = max(self.max_step, step/1.1)
        self.rho = float(2*self.obs*step)/float(3)
        #self.rho = max(float(self.obs + 0.1) / float(3), self.rho/1.1)
        #if 3*self.rho - self.obs <= 0.01:
        #    return
        #self.rho = (np.exp(float(-self.iteration)/100) + self.obs)/3
        #if self.rho == self.max_rho:
        #    return
        #self.rho = min(self.rho * 1.1, self.max_rho)
        self.nju = float(self.obs)/float(3*self.rho)
        print "Rho: %s" % self.rho
        print "Nju: %s" % self.nju

    def run_algorithm(self, max_iter=10000):
        self.init_algorithm()
        self.iteration = 0
        stopping_criteria = False
        thetas_pre = []
        start_time = time.time()
        while self.iteration < max_iter and stopping_criteria is False:
            if self.iteration == 20:
            if self.iteration % 500 == 0 or self.iteration == 1:
                print "\n*** Iteration %s ***" % self.iteration
                print "Time passed: {0:.3g}s".format(time.time() - start_time)
                print "Rho: %s" % self.rho
                print "Nju: %s" % self.nju
                print "Step: {0:.3f}".format(1/(2*self.nju))
            if self.iteration % 500 == 0 or self.iteration == 1:
                s_time = time.time()
            self.theta_update()
            if self.iteration == 20:
                print "Theta update: {0:.3g}".format(time.time() - s_time)
            if self.iteration == 20:
            if self.iteration % 500 == 0 or self.iteration == 1:
                print "Theta update: {0:.3g}s".format(time.time() - s_time)
            if self.iteration % 500 == 0 or self.iteration == 1:
                s_time = time.time()
            self.z_update()
            if self.iteration == 20:
                print "Z-update: {0:.3g}".format(time.time() - s_time)
            if self.iteration == 20:
            if self.iteration % 500 == 0 or self.iteration == 1:
                print "Z-update: {0:.3g}s".format(time.time() - s_time)
            if self.iteration % 500 == 0 or self.iteration == 1:
                s_time = time.time()
            self.u_update()
            if self.iteration == 20:
                print "U-update: {0:.3g}".format(time.time() - s_time)
            if self.iteration % 500 == 0 or self.iteration == 1:
                print "U-update: {0:.3g}s".format(time.time() - s_time)
            """ Check stopping criteria """
            if self.iteration == 20:
            if self.iteration % 500 == 0 or self.iteration == 1:
                s_time = time.time()
            if self.iteration > 0:
                fro_norm = 0
@@ -116,8 +141,8 @@ class TVGL(object):
                    stopping_criteria = True
            thetas_pre = list(self.thetas)
            self.iteration += 1
            if self.iteration % 500 == 0:
                print "Iteration %s" % self.iteration
            #print "iter %s" % self.iteration
            #self.adjust_rho()
        self.run_time = "{0:.3g}".format(time.time() - start_time)
        self.final_tuning(stopping_criteria, max_iter)

@@ -133,13 +158,16 @@ class TVGL(object):
    def terminate_pools(self):
        pass

    def init_algorithm(self):
        pass

    def final_tuning(self, stopping_criteria, max_iter):
        self.thetas = [np.round(theta, 3) for theta in self.thetas]
        self.terminate_pools()
        if stopping_criteria:
            print "Iterations to complete: %s" % self.iteration
            print "\nIterations to complete: %s" % self.iteration
        else:
            print "Max iterations (%s) reached" % max_iter
            print "\nMax iterations (%s) reached" % max_iter

    def temporal_deviations(self):
        deviations = np.zeros(self.blocks - 1)
+34 −12
Original line number Diff line number Diff line
@@ -4,25 +4,47 @@ import numpy as np

def soft_threshold_odd(a, lambd, rho):
    dimension = np.shape(a)[0]
    e = np.ones((dimension, dimension))
    for i in range(dimension):
        for j in range(dimension):
            if i != j:
                if abs(a[i, j]) <= lambd/rho:
                    e[i, j] = 0
                else:
                    e[i, j] = np.sign(a[i, j])*(
    e = np.eye(dimension, dimension)
    for i in xrange(dimension - 1):
        for j in range(i + 1, dimension):
            if abs(a[i, j]) > lambd/rho:
                result = np.sign(a[i, j])*(
                    abs(a[i, j]) - lambd/rho)
                e[i, j] = result
                e[j, i] = result
    return e


def soft_threshold_odds(aa, lambd, rho):
    dimension = np.shape(aa[0])[0]
    ee = [np.eye(dimension, dimension) for i in range(len(aa))]
    for a, e in zip(aa, ee):
        for i in range(dimension - 1):
            for j in xrange(i + 1, dimension):
                if abs(a[i, j]) > lambd/rho:
                    result = np.sign(a[i, j])*(
                        abs(a[i, j]) - lambd/rho)
                    e[i, j] = result
                    e[j, i] = result
    return ee


def group_lasso_penalty(a, nju):
    dimension = np.shape(a)[0]
    e = np.zeros((dimension, dimension))
    for j in range(dimension):
        l2_norm = np.linalg.norm(a[:, j])
        if l2_norm <= nju:
            e[:, j] = np.zeros(dimension)
        else:
        if l2_norm > nju:
            e[:, j] = (1 - nju/l2_norm)*a[:, j]
    return e


def group_lasso_penaltys(aa, nju):
    dimension = np.shape(aa[0])[0]
    ee = [np.zeros((dimension, dimension)) for i in range(len(aa))]
    for a, e in zip(aa, ee):
        for j in range(dimension):
            l2_norm = np.linalg.norm(a[:, j])
            if l2_norm > nju:
                e[:, j] = (1 - nju/l2_norm)*a[:, j]
    return ee
+8 −0
Original line number Diff line number Diff line
@@ -6,6 +6,8 @@ from SerialTVGL import SerialTVGL
from AsyncProTVGL import AsyncProTVGL
from MultiProTVGL import MultiProTVGL
from ProcTVGL import ProcTVGL
from LastTVGL import LastTVGL
from ParallelTVGL import ParallelTVGL
from StaticGL import StaticGL


@@ -30,6 +32,12 @@ if __name__ == "__main__" and len(sys.argv) > 1:
    elif algo_type == "proc":
        algorithm = ProcTVGL(filename, blocks,
                             lambd, beta, processes=int(sys.argv[6]))
    elif algo_type == "last":
        algorithm = LastTVGL(filename, blocks,
                             lambd, beta, processes=int(sys.argv[6]))
    elif algo_type == "parallel":
        algorithm = ParallelTVGL(filename, blocks,
                                 lambd, beta, processes=int(sys.argv[6]))
    elif algo_type == "static":
        algorithm = StaticGL(filename, blocks,
                             lambd, processes=int(sys.argv[5]))