精度损失警告与运行速度问题

# -*- coding: utf-8 -*-
import taichi as ti
import numpy as np
import random
import time
ti.init()

@ti.data_oriented
class Population:
    def __init__(self, min_range, max_range, dim, rounds, size, object_func):
        self.min_range = ti.field(ti.f32, dim)
        self.max_range = ti.field(ti.f32, dim)
        self.min_range = min_range #list形式,输入各参数下界
        self.max_range = max_range #list形式,输入各参数上界
        self.dimension = dim #问题维数,即参数个数
        self.rounds = rounds #迭代轮次
        self.size = size #种群大小
        self.G = 1 #代数
        self.CR = 1 #交叉率
        self.PI = np.pi
        self.get_object_function_value = object_func #目标函数
        self.AG = ti.Vector.field(dim, ti.f32, size)
        self.BG = ti.Vector.field(dim, ti.f32, size)
        self.best_x=ti.Vector.field(self.dimension, ti.f32, self.rounds+1)
        for i in range(size):
            self.AG[i] = np.array(self.max_range)-(np.array(self.max_range)-np.array(self.min_range))*random.random()
            self.BG[i] = (np.array(self.max_range)-np.array(self.min_range))*random.random()+np.array(self.min_range)
        self.Aobject_function_values = ti.field(ti.f32, size)
        self.Bobject_function_values = ti.field(ti.f32, size)
        self.AG_next_1=ti.Vector.field(self.dimension, ti.f32, self.size)
        
        self.BG_next_1=ti.Vector.field(self.dimension, ti.f32, self.size)
        self.AG_next_2=ti.Vector.field(self.dimension, ti.f32, self.size)
        self.BG_next_2=ti.Vector.field(self.dimension, ti.f32, self.size)
        self.XG_next=ti.Vector.field(self.dimension, ti.f32, self.size)
        self.value = ti.field(ti.f32, size)
        self.Gmin = ti.field(ti.f32, rounds)
    
    @ti.kernel
    def initial_AB(self): #初始化种群
        for i in self.AG:
            self.Aobject_function_values[i] = self.get_object_function_value(self.AG[i])
            self.Bobject_function_values[i] = self.get_object_function_value(self.BG[i])

    #@ti.kernel
    def mutate_A(self): #A种群变异
        for i in range(self.size):
            r0 = 0
            r1 = 0
            while r0 == r1 or  r0 == i or r1 == i: #如果相等,重新生成
                r0 = random.randint(0, self.size-1) #生成0-size-1的随机数
                r1 = random.randint(0, self.size-1)
            F1=0.4*(ti.cos((self.G-1)*self.PI/self.rounds)+1)+0.1 #自适应种群A变异率
            son = ti.Vector([0 for k in range(self.dimension)])
            # for j in ti.static(range(self.dimension)):
            #     son[j] = self.AG[r0][j] + F1*(self.best_x[self.G-1][j] - self.AG[r1][j])
            son=np.array(self.AG[r0])+F1*(np.array(self.best_x[self.G-1])-np.array(self.AG[r1]))
            for j in ti.static(range(self.dimension)):
                if son[j] > self.min_range[j] and son[j] < self.max_range[j]:
                    self.AG_next_1[i][j]=son[j]
                else:
                    self.AG_next_1[i][j]=(self.max_range[j]-self.min_range[j])*random.random()+self.min_range[j]
    

    #@ti.kernel
    def mutate_B(self): #B种群变异
        for i in range(self.size):
            r0 = random.randint(0, self.size-1)
            while r0 == i: #如果相等,重新生成
                r0 = random.randint(0, self.size-1) #生成0~size-1的随机数
            F2=0.5*(2-np.power(2, 1-self.rounds/self.G/self.G)) #自适应种群B变异率
            son = ti.Vector([0 for k in range(self.dimension)])
            son=random.random()*np.array(self.AG[r0])+F2*(np.array(self.best_x[self.G-1])-np.array(self.BG[r0]))
            for j in ti.static(range(self.dimension)):
                if son[j] > self.min_range[j] and son[j] < self.max_range[j]:
                    self.BG_next_1[i][j]=son[j]
                else:
                    self.BG_next_1[i][j]=(self.max_range[j]-self.min_range[j])*random.random()+self.min_range[j]
    #@ti.kernel
    def cross_AB(self): #交叉
        self.CR=0.4*(ti.cos((self.G-1)*self.PI/self.rounds-self.PI)+1) #自适应交叉率
        for i in range(self.size):
            s=[x for x in range(self.dimension)]
            randx=random.sample(s,k=self.dimension)
            for j in ti.static(range(self.dimension)):
                tmp=random.random()
                if tmp > self.CR and randx[0] != j:
                    self.AG_next_2[i][j]=self.AG[i][j]
                    self.BG_next_2[i][j]=self.BG[i][j]
                else:
                    self.AG_next_2[i][j]=self.AG_next_1[i][j]
                    self.BG_next_2[i][j]=self.BG_next_1[i][j]
    @ti.kernel
    def choose(self): #选择最优解
        for i in range(self.size):
            Avalue = ti.Vector([0 for k in range(3)])
            best = self.get_object_function_value(self.AG[i])
            self.XG_next[i] = self.AG[i]
            v1 = self.get_object_function_value(self.AG_next_1[i])
            v2 = self.get_object_function_value(self.AG_next_2[i])
            if best > v1:
                best = v1
                self.XG_next[i] = self.AG_next_1[i]
            if best > v2:
                best = v2
                self.XG_next[i] = self.AG_next_2[i]
            b1 = self.get_object_function_value(self.BG[i])
            b2 = self.get_object_function_value(self.BG_next_1[i])
            b3 = self.get_object_function_value(self.BG_next_2[i])
            if best > b1:
                best = b1
                self.XG_next[i] = self.BG[i]
            if best > b2:
                best = b2
                self.XG_next[i] = self.BG_next_1[i]
            if best > b3:
                best = b3
                self.XG_next[i] = self.BG_next_2[i]
        for i in range(self.size):
            self.value[i]=self.get_object_function_value(self.XG_next[i])


    def evolution(self): #进化
        '''
        ******************* 初始化.**********************************************
        '''
        self.initial_AB()
        startP = time.time()
        best_value=np.inf
        best_x=np.empty(shape=(self.rounds+1,self.dimension))
        Abest_index = np.argmin(self.Aobject_function_values.to_numpy())
        Bbest_index = np.argmin(self.Bobject_function_values.to_numpy())
        if self.Aobject_function_values[Abest_index] < self.Bobject_function_values[Bbest_index]:
            best_x[0] = self.AG[Abest_index].to_numpy()
        else:
            best_x[0] = self.BG[Bbest_index].to_numpy()
    
        
        '''
        ******************* 开始进化.**********************************************
        '''
        
        self.G=1
        while self.G <= self.rounds:
            '''
            ******************* 变异过程.**********************************************
            '''
            '''
            *** 种群A变异过程.****
            '''
            self.mutate_A()
            '''
            *** 种群B变异过程.****
            '''
            self.mutate_B()
            '''
          ******************* 交叉过程.**********************************************
            '''
            self.cross_AB()
            '''
          ******************* 选择过程.**********************************************
            '''         
            self.choose()
            '''
          ******************* 获取最优解.**********************************************
            '''
            [value_min,pos_min]=[np.min(self.value.to_numpy()),np.argmin(self.value.to_numpy())]
            self.Gmin[self.G-1]=value_min
            if self.G >= 2:
                value_min=min(value_min,self.Gmin[self.G-2])
            best_value=min(value_min,best_value)
            best_x[self.G]=self.XG_next[pos_min].to_numpy()
            print('Generation=',self.G,'\t','min_value= ',best_value)
            # if best_value < self.accuracy: #收敛性检验
            #     print('The calibration took ',time.time() - startP,' seconds')
            #     print('CF calibration finished, please check xml file.')
            #     return best_x[G]
            self.G=self.G+1 
        print('The optimization took ',time.time() - startP,' seconds')
        [value_min,pos_min]=[np.min(self.Gmin.to_numpy()),np.argmin(self.Gmin.to_numpy())]
        return best_x[pos_min+1] #返回最优解


@ti.func
def SCH(x) -> ti.f32:
    sum = 0
    for i in ti.static(range(x.n)):
        sum = sum - x[i]*ti.sin(ti.sqrt(abs(x[i])))
    return sum

min_bound=([-500.0 for i in range(30)])
max_bound=([500.0 for i in range(30)])
p=Population(min_bound,max_bound,30, 100, 30,SCH)
p.evolution()

各位老师好:
我在用太极编写一个启发式算法求最小值的程序,以上为代码,在运行的过程中,首先会报一系列的精度损失警告,而且该程序是并行化的,但却比我直接编写的相同的算法代码(不牵涉taichi)要慢很多(80s,8s),我检查了代码,但找不出问题在哪儿,想问下各位老师是什么原因,感谢!

精度问题是SCH那里应该是sum=0.0。性能的话我看你代码里有不少地方没用kernel,这些代码不会被并行,以及你用了不少numpy的东西,这些在跟太极数据类型转换的时候也会有性能损失

2 Likes

Hi @Ming_CHEN,

  1. 精度问题,确实如 @Vineyo 所说
@ti.func
def SCH(x) -> ti.f32:
    sum = 0 # 精度问题出在这里
    for i in ti.static(range(x.n)):
        sum = sum - x[i]*ti.sin(ti.sqrt(abs(x[i])))
    return sum

使用ti.kernelti.func两个修饰器修饰的函数属于Taichi scope,这部分代码会被提前编译好,函数内的类型会根据你的初始化提前确定。
sum=0,太极编译器就认为sum是一个i32 sum - x[i]*ti.sin(ti.sqrt(abs(x[i])))会得到一个float类型的数字,然后赋值给sum时就会导致精度缺失。

  1. 代码的速度
    首先你可以设置ti.init(arch=ti.gpu),然后将计算中可以并行的部分尽可能的放到Taichi scope里处理。
    还有就是你可以评估一下性能的瓶颈在哪些地方,然后做有目的的代码优化。可以参考太极图形课:here
1 Like

好的,谢谢老师

谢谢!