Source code for idyom.longTermModel

from idyom import data
from idyom import markovChain
from idyom import markovChainOrder0

import numpy as np
import pickle
from tqdm import tqdm
import math

VERBOSE = False	

[docs]class longTermModel(): """ Module implementing the Long Term Model from IDyOM, this model contains several Markov Chains of different orders weighted by their respective shanon entropy. :param viewPoint: viewPoint to use, cf. data.getViewPoints() :param maxOrder: maximal order of the models :param alphabetSize(optional): size of the alphabet, number of viewPoints value to take account in :type viewPoint: string :type maxOrder: int :type alphabetSize(optional): int """ def __init__(self, viewPoint, maxOrder=1, STM=False, init=None, evolutive=False): # ViewPoint to use self.viewPoint = viewPoint # maximum order if given self.maxOrder = maxOrder # to track if is LTM or STM self.STM = STM #Wether it's an evolutive model self.evolutive = evolutive # in order to compute model entropy directly from MC entropies self.entropies = {} if init is not None: maxOrder = len(init) if self.maxOrder is None: maxOrder = maxOrder // 2 # CHANGE IT TO maxOrder - 1, maybe else: maxOrder = self.maxOrder self.maxOrder = maxOrder if VERBOSE: print("The maximal order is:", self.maxOrder) # list contening different order markov chains self.models = [] for order in range(1, self.maxOrder+1): self.models.append(markovChain.markovChain(order, STM=self.STM, evolutive=evolutive)) self.modelOrder0 = markovChainOrder0.markovChainOrder0(STM=self.STM, evolutive=evolutive) self.benchmark = [0, 0, 0] def getObservations(self): ret = 0 for model in self.models: ret += model.getObservationsSum() return ret
[docs] def train(self, data, shortTerm=False, preComputeEntropies=False): """ Fill the matrix from data :param data: data to train from :type data: list of np.array or list of list of int """ if shortTerm is True: # training all the models self.modelOrder0.train([data[0][-1:]]) for i in range(len(self.models)): self.models[i].train([data[0][-self.models[i].order-1:]]) if self.models[i].usedScores == 0: if VERBOSE: print("The order is too high for these data, we stop the training here.") break return if VERBOSE: print("The maximal order is:", self.maxOrder) import time # training all the models self.modelOrder0.train(data) for i in range(len(self.models)): self.models[i].train(data, preComputeEntropies=preComputeEntropies) if self.models[i].usedScores == 0: if VERBOSE: print("The order is too high for these data, we stop the training here.") break
[docs] def getPrediction(self, sequence): """ Returns the probability distribution from a given state :param sequence: a sequence of viewPoint data, cf. data.getData(viewPoint) :type sequence: np.array(length) :return: dictionary | dico[z] = P(z|sequence) (float) """ alphabet = [] for model in self.models: alphabet.extend(model.alphabet) alphabet = list(set(alphabet)) alphabet.sort() dico = {} for z in alphabet: dico[str(z)] = self.getLikelihood(sequence, z) return dico
[docs] def getEntropyMax(self, state): """ Return the maximum entropy for an alphabet. This is the case where all element is equiprobable. :param state: state to compute from :type state: list or str(list) :return: maxEntropy (float) """ alphabetSize = np.count_nonzero(list(self.getPrediction(state).values())) maxEntropy = 0 for i in range(alphabetSize): maxEntropy -= (1/alphabetSize) * math.log(1/alphabetSize, 2) return maxEntropy
def getAlphabet(self): alphabet = [] for model in self.models: alphabet.extend(model.alphabet) return list(set(alphabet))
[docs] def getEntropy(self, state, genuine_entropies=False): """ Return shanon entropy of the distribution of the model from a given state :param state: state to compute from :type state: list or str(list) :param genuine_entropies: wether to use the genuine entropies (over the approx) (default False) :type genuine_entropies: bool :return: entropy (float) """ if not genuine_entropies: return self.mergeProbas(self.entropies[str(state)], self.entropies[str(state)]) P = self.getPrediction(state).values() if None in P: print("It's not possible to compute this entropy, we kill the execution.") print("state:",state) print("probabilities:", P) quit() entropy = 0 for p in P: if p != 0: entropy -= p * math.log(p, 2) return entropy
[docs] def getRelativeEntropy(self, state, genuine_entropies=False): """ Return the relative entropy H(m)/Hmax(m). It is used for weighting the merging of models without bein affected by the alphabet size. :param state: state to compute from :type state: list or str(list) :return: entropy (float) """ maxEntropy = self.getEntropyMax(state) if maxEntropy > 0: return self.getEntropy(state, genuine_entropies=genuine_entropies)/maxEntropy else: return 1
[docs] def getLikelihood(self, state, note): """ Returns the likelihood of a note given a state :param state: a sequence of viewPoint data, cf. data.getData(viewPoint) :param note: the interger or name of the note :type state: np.array(length) :type note: int or string :return: float value of the likelihood """ probas = [] weights = [] entropies = [] k = -1 for model in self.models: k += 1 # we don't want to take in account a model that is not capable of prediction if model.order <= len(state) and model.getLikelihood(str(list(state[-model.order:])), note) is not None: if model.getObservations(state[-model.order:]) is not None: probas.append(model.getLikelihood(state[-model.order:], note)) weights.append(model.getRelativeEntropy(state[-model.order:])) entropies.append(model.getEntropy(state[-model.order:])) # Order 0 probas.append(self.modelOrder0.getLikelihood(int(note))) weights.append(self.modelOrder0.getRelativeEntropy()) entropies.append(self.modelOrder0.getEntropy()) if probas == []: return None self.entropies[str(state)] = np.array(entropies) #print(probas) #print(np.array(entropies)) return self.mergeProbas(probas, np.array(entropies))
[docs] def mergeProbas(self, probas, weights, b=1): """ Merging probabilities from different orders, for now we use arithmetic mean :param probas: probabilities to merge :param weights: weights for the mean, should be get from normalized entropy :type probas: list or numpy array :type weights: list or numpy array :return: merged probabilities (float) """ # we inverse the entropies weights = (weights.astype(float)+np.finfo(float).eps)**(-b) # Doomy normalization for w in weights: if w < 0: weights += abs(min(weights)) break if np.sum(weights) == 0: weights = np.ones(len(weights)) weights = weights/np.sum(weights) ret = 0 for i in range(len(probas)): ret += probas[i]*weights[i] return ret
[docs] def sample(self, state): """ Return a element sampled from the model given the sequence S :param S: sequence to sample from :type S: list of integers :return: sampled element (int) """ alphabet = [] for model in self.models: alphabet.extend(model.alphabet) alphabet = list(set(alphabet)) alphabet.sort() distribution = [] # We reconstruct the distribution according to the sorting of the alphabet for elem in alphabet: distribution.append(self.getLikelihood(state, elem)) ret = int(np.random.choice(alphabet, p=distribution)) return ret
[docs] def generate(self, length): """ Implement a very easy random walk in order to generate a sequence :param length: length of the generated sequence (in elements, not beat so it depends on the quantization) :type length: int :return: sequence (np.array()) """ S = [] # We uniformly choose the first element S.append(int(np.random.choice(self.models[0].alphabet))) while len(S) < length and str([S[-1]]) in self.models[0].stateAlphabet : S.append(self.sample(S)) return S
[docs] def save(self, file): """ Save a trained model :param file: path to the file :type file: string """ f = open(file, 'wb') pickle.dump(self.__dict__, f, 2) f.close()
[docs] def load(self, path): """ Load a trained model :param path: path to the file :type path: string """ f = open(path, 'rb') tmp_dict = pickle.load(f) f.close() self.__dict__.update(tmp_dict)