Source code for DLITE.ManualTracingMultiple

import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import numpy.linalg as la
import collections
from scipy import stats
from matplotlib import cm
import os
import matplotlib.patches as mpatches
from collections import defaultdict
import pylab
import pandas as pd
from .cell_describe import colony
from .ManualTracing import ManualTracing


[docs]class ManualTracingMultiple: def __init__(self, numbers, name_first=None, name_last=None, type=None): """ Class to handle colonies at mutliple time points that have been manually traced out using NeuronJ Numbers is a list containing start and stop index e.g [2,4] of the files labeled as - 'MAX_20170123_I01_003-Scene-4-P4-split_T0.ome.txt' ^ this number changes - 0,1,2,3,..30 """ if not type: self.name_first = 'MAX_20170123_I01_003-Scene-4-P4-split_T' self.name_last = '.ome.txt' elif type == 'small_v2': self.name_first = '20170123_I01_003.czi - 20170123_I01_00300' self.name_last = '.txt' elif type == 'small_v3': self.name_first = '7_20170123_I01_003.czi - 20170123_I01_00300' self.name_last = '.txt' elif type == 'small_v4': self.name_first = '002_20170123_I01_002.czi - 20170123_I01_00200' self.name_last = '.txt' elif type == 'large_v1': self.name_first = 'AVG_20170124_I07_001-Scene-4-P2-100' self.name_last = '.txt' else: self.name_first = name_first self.name_last = name_last names = [] for i in range(numbers[0], numbers[-1], 1): names.append(self.name_first + str(i) + self.name_last) names.append(self.name_first + str(numbers[-1]) + self.name_last) self.names = names
[docs] def get_x_y_data(self, number): """ Retrieve X and Y co-ordinates of a colony at a time point specified by number """ if number < 10 and self.name_first == 'AVG_20170124_I07_001-Scene-4-P2-100': file = self.name_first + str(0) + str(number) + self.name_last elif number < 10 and self.name_first == '20170123_I01_003.czi - 20170123_I01_00300': file = self.name_first + str(0) + str(number) + self.name_last elif number < 10 and self.name_first == '7_20170123_I01_003.czi - 20170123_I01_00300': file = self.name_first + str(0) + str(number) + self.name_last elif number < 10 and self.name_first == '002_20170123_I01_002.czi - 20170123_I01_00200': file = self.name_first + str(0) + str(number) + self.name_last else: file = self.name_first + str(number) + self.name_last with open(file, 'r') as f: a = [l.split(',') for l in f] x, y, X, Y = [], [], [], [] for num in a: if len(num) == 2: x.append(int(num[0])) y.append(int(num[1].strip('\n'))) if len(num) == 1: X.append(x) Y.append(y) x = [] y = [] X.append(x) Y.append(y) X.pop(0) Y.pop(0) return X, Y
[docs] def get_nodes_edges_cells(self, number): """ Get nodes, edges and cells at time point number these nodes, edges and cells do not have any labels CHECK - cutoff values used, need to go to every frame and check to see if cutoff works or not """ x, y = self.get_x_y_data(number) ex = ManualTracing(x, y) if self.name_first == 'MAX_20170123_I01_003-Scene-4-P4-split_T': cutoff = 14 if 0 <= number <= 3 \ or 5 <= number <= 7 \ or 10 <= number <= 11 \ or number == 13 or number == 17 else \ 16 if 14 <= number <= 15 \ or 18 <= number <= 20 or 22 <= number <= 30 else \ 17 if 8 <= number <= 9 or number == 16 \ or number == 16 else \ 20 if number == 4 else 12 elif self.name_first == '20170123_I01_003.czi - 20170123_I01_00300': cutoff = 20 elif self.name_first == '7_20170123_I01_003.czi - 20170123_I01_00300': cutoff = 15 elif self.name_first == '002_20170123_I01_002.czi - 20170123_I01_00200': cutoff = 15 else: cutoff = 10 print('File %d used a Cutoff value ------> %d' % (number, cutoff)) nodes, edges, new = ex.cleanup(cutoff) cells = ex.find_cycles(edges) return nodes, edges, cells
[docs] def initial_numbering(self, number0): """ Assign random labels to nodes, edges and cells in the colony specified by number0 Returns labeled nodes, edges and cells. Also returns a dictionary defined as {node.label: edges connected to node label, vectors of edges connected to node label} """ # Get the list of nodes for name0 temp_nodes, edges, initial_cells = self.get_nodes_edges_cells(number0) def func(p, common_node): # This function outputs the absolute angle (0 to 360) that the edge makes with the horizontal if p.node_a == common_node: this_vec = np.subtract(p.node_b.loc, p.node_a.loc) else: this_vec = np.subtract(p.node_a.loc, p.node_b.loc) return this_vec # Create an empty dictionary old_dictionary = defaultdict(list) for j, node in enumerate(temp_nodes): # Give every node a label -> in this case we're arbitrarily givig labels as we loop through node.label = j sort_edges = node.edges this_vec = [func(p, node) for p in sort_edges] old_dictionary[node.label].append(sort_edges) old_dictionary[node.label].append(this_vec) for k, c in enumerate(initial_cells): c.label = k for p, ed in enumerate(edges): ed.label = p return temp_nodes, old_dictionary, initial_cells, edges
[docs] def track_timestep(self, old_colony, old_dictionary, number_now): """ We want to output a dictionary that contains a list of edges in number_now that is the same (almost) as the edges in old_colony ----------- Parameters ----------- old_colony - colony instance for the previous time step old_dictionary - dictionary for the colony instance in old_colony {node.label: edges, vectors} number_now - number of current time point """ def func(p, common_node): # This function outputs the absolute angle (0 to 360) that the edge makes with the horizontal if p.node_a == common_node: this_vec = np.subtract(p.node_b.loc, p.node_a.loc) else: this_vec = np.subtract(p.node_a.loc, p.node_b.loc) return this_vec def py_ang(v1, v2): """ Returns the angle in degrees between vectors 'v1' and 'v2' """ cosang = np.dot(v1, v2) sinang = la.norm(np.cross(v1, v2)) return np.rad2deg(np.arctan2(sinang, cosang)) # Get list of nodes and edges for every time point old_nodes = old_colony.tot_nodes old_edges = old_colony.tot_edges old_cells = old_colony.cells # Get list of nodes and edges for names_now that don't have labels now_nodes, now_edges, now_cells = self.get_nodes_edges_cells(number_now) # Find the node in now_nodes that is closest to a node in old_nodes and give same label for j, prev_node in enumerate(old_nodes): # Give the same label as the previous node closest_new_node = min([node for node in now_nodes], key=lambda p: np.linalg.norm(np.subtract(prev_node.loc, p.loc))) # Check that the edge vectors on this node are similar to the edge vectors on the prev node if len(closest_new_node.edges) == 1: # Want to check that angles are similar if py_ang(closest_new_node.tension_vectors[0], prev_node.tension_vectors[0]) < 15: closest_new_node.label = prev_node.label else: # If its connected to 3 edges, closest node is fine. only single edge nodes are problematic closest_new_node.label = prev_node.label upper_limit = max([n.label for n in now_nodes if n.label != []]) upper_edge_limit = max([e.label for e in old_edges if e.label != []]) new_dictionary = defaultdict(list) total_now_edges = [] special_labels = [] for node in now_nodes: if node.label: if node.label < upper_limit + 1: try: old_edges_node = old_dictionary[node.label][0] old_angles = old_dictionary[node.label][1] temp_edges = [] new_vec = [func(p, node) for p in node.edges] if len(old_angles) == len(node.edges): for old_e in old_angles: v1_v2_angs = [py_ang(old_e, nw) for nw in new_vec] min_ang = min(v1_v2_angs) for ed in node.edges: vec = func(ed, node) if py_ang(old_e, vec) == min_ang: closest_edge = ed temp_edges.append(closest_edge) if closest_edge not in total_now_edges: total_now_edges.append(closest_edge) if len([item for item, count in collections.Counter(temp_edges).items() if count > 1]) != 0: temp_edges = [] new_vecs = [func(p, node) for p in temp_edges] for k, p in zip(old_edges_node, temp_edges): if not p.label: labels = [e.label for e in total_now_edges] if k.label not in labels: p.label = k.label new_dictionary[node.label].append(temp_edges) new_dictionary[node.label].append(new_vecs) if len([e.label for e in old_edges_node]) != len([e.label for e in node.edges]): special_labels.append(node.label) # print('Possible change in topology') # print('Node label', node.label, 'old edge labels', # [e.label for e in old_edges_node], 'New edge labels', # [e.label for e in node.edges], end=' ') except: pass if upper_limit < 1000: count = upper_limit + 1 else: count = upper_limit + 1 if upper_edge_limit < 1000: count_edge = upper_edge_limit + 1 else: count_edge = upper_edge_limit + 1 for node in now_nodes: check = 0 if not node.label and node.label != 0: node.label = count count += 1 check = 1 for e in node.edges: if not e.label and e.label != 0: e.label = count_edge count_edge += 1 check = 1 if check == 1: temp_edges = node.edges new_vecs = [func(p, node) for p in temp_edges] if len(new_dictionary[node.label]) == 2: new_dictionary[node.label].pop() new_dictionary[node.label].pop() new_dictionary[node.label].append(temp_edges) new_dictionary[node.label].append(new_vecs) if node.label in special_labels: temp_edges = node.edges new_vecs = [func(p, node) for p in temp_edges] new_dictionary[node.label].pop() new_dictionary[node.label].pop() new_dictionary[node.label].append(temp_edges) new_dictionary[node.label].append(new_vecs) set1 = set(old_dictionary) set2 = set(new_dictionary) combined_dict = defaultdict(list) # Find the labels that are common between the 2 lists and return a dictionary of the form # {label, old_edges, new_edges} for label in set1.intersection(set2): if old_dictionary[label] != [] and new_dictionary[label] != []: if len(old_dictionary[label][0]) == len(new_dictionary[label][0]): combined_dict[label].append(old_dictionary[label][0]) combined_dict[label].append(new_dictionary[label][0]) # Label cells now_cells = self.label_cells(old_cells, now_cells) # Define a colony with labeled nodes, edges and cells edges2 = [e for e in now_edges if e.radius is not None] now_nodes, now_cells, edges2 = self.assign_intial_guesses(now_nodes, now_cells, old_cells, edges2, old_edges) new_colony = colony(now_cells, edges2, now_nodes) return new_colony, new_dictionary
[docs] def label_cells(self, old_cells, now_cells): """ Now_nodes is the list of nodes at the current time step These nodes have labels based on previous time steps old_cells - cells from prev time step now_cells - cells in current time step """ for j, cc in enumerate(old_cells): closest_new_cell = min([c for c in now_cells], key=lambda p: np.linalg.norm(np.subtract(cc.centroid(), p.centroid()))) if not closest_new_cell.label: if np.linalg.norm(np.subtract(cc.centroid(), closest_new_cell.centroid())) < 100: closest_new_cell.label = cc.label max_label = max([c.label for c in now_cells if c.label != []]) if max_label > 999: count = max_label + 1 else: count = 1000 for j, cc in enumerate(now_cells): if not cc.label and cc.label != 0: now_cells[j].label = count count += 1 return now_cells
[docs] def assign_intial_guesses(self, now_nodes, now_cells, old_cells, edges2, old_edges): """ Process - (1) Call track_timestep - this assigns a generic labeling of nodes and cells to number_old (by calling initial_numbering) and also retrieves a dictionary relating each labelled node to its connected edge vectors. It then assigns labels to nodes, edges and cells in number_now based on this older timestep. Summary This function gives you labelled nodes and cells for number_now and number_old (edges saved in dictionary - combined_dict) (2) Assign 'guess tension/pressure' (for number_now) based on the 'true tension/pressure' (for number_old) by matching labels """ for cc in now_cells: if cc.label: if cc.label in [old.label for old in old_cells]: match_old_cell = [old for old in old_cells if old.label == cc.label][0] cc.guess_pressure = match_old_cell.pressure for ed in old_edges: label = ed.label for new_ed in edges2: if new_ed.label == label: if not new_ed.guess_tension: new_ed.guess_tension = ed.tension return now_nodes, now_cells, edges2
[docs] def first_computation(self, number_first, solver=None, type=None, **kwargs): """ Main first computation Retuns a colony at number_first Parameters ------------- number_first - number specifying first time step """ colonies = {} nodes, dictionary, cells, edges = self.initial_numbering(number_first) edges2 = [e for e in edges if e.radius is not None] if type == 'Jiggling': name = str(0) else: name = str(number_first) colonies[name] = colony(cells, edges2, nodes) tensions, p_t, a_mat = colonies[name].calculate_tension(solver=solver, **kwargs) pressures, p_p, b_mat = colonies[name].calculate_pressure(solver=solver, **kwargs) colonies[name].tension_matrix = a_mat colonies[name].pressure_matrix = b_mat return colonies, dictionary
[docs] def main_computation_based_on_prev(self, numbers, colonies=None, index=None, old_dictionary=None, solver=None, **kwargs): """ Recursive loop that cycles through all the time steps Steps - (1) Call self.first_computation() - returns first colony with generic labeling (2) Call self.track_timestep() - returns new colony that used info in the old colony to assign some initial guesses for tensions and pressures. saved in edge.guess_tension and cell.guess_pressure (3) Calculate tension and pressure on the new colony (4) Call this function again. Keep doing this till we reach the max number (5) Return colonies """ if not colonies: colonies, old_dictionary = self.first_computation(numbers[0], solver, type='Jiggling', **kwargs) print('Solver is', solver) print('First colony', colonies) colonies[str(0)].dictionary = old_dictionary index = 0 colonies[str(index + 1)], new_dictionary = self.track_timestep(colonies[str(index)], old_dictionary, numbers[index + 1]) colonies[str(index + 1)].dictionary = new_dictionary tensions, p_t, a_mat = colonies[str(index + 1)].calculate_tension(solver=solver, **kwargs) pressures, p_p, b_mat = colonies[str(index + 1)].calculate_pressure(solver=solver, **kwargs) # Save tension and pressure matrix colonies[str(index + 1)].tension_matrix = a_mat colonies[str(index + 1)].pressure_matrix = b_mat print('Next colony number', str(index + 1)) index = index + 1 if index < len(numbers) - 1: colonies = self.main_computation_based_on_prev(numbers, colonies, index, new_dictionary, solver, **kwargs) return colonies
[docs] def alternative_computation_based_on_prev(self, numbers, colonies=None, index=None, old_dictionary=None, solver=None, **kwargs): """ Same as above, except can store colonies in a dictionary numbered as their input time points """ if not colonies: colonies, old_dictionary = self.first_computation(numbers[0], solver, **kwargs) colonies[str(numbers[0])].dictionary = old_dictionary index = 0 if numbers[index + 1] == numbers[index]: colonies[str(index + 1)], new_dictionary = self.track_timestep(colonies[str(numbers[index])], old_dictionary, numbers[index + 1]) colonies[str(index + 1)].dictionary = new_dictionary tensions, p_t, a_mat = colonies[str(index + 1)].calculate_tension(solver=solver, **kwargs) pressures, p_p, b_mat = colonies[str(index + 1)].calculate_pressure(solver=solver, **kwargs) # Save tension and pressure matrix colonies[str(index + 1)].tension_matrix = a_mat colonies[str(index + 1)].pressure_matrix = b_mat print('Next colony number', str(index + 1)) else: colonies[str(numbers[index + 1])], new_dictionary = self.track_timestep(colonies[str(numbers[index])], old_dictionary, numbers[index + 1]) colonies[str(numbers[index + 1])].dictionary = new_dictionary tensions, p_t, a_mat = colonies[str(numbers[index + 1])].calculate_tension(solver=solver, **kwargs) pressures, p_p, b_mat = colonies[str(numbers[index + 1])].calculate_pressure(solver=solver, **kwargs) # Save tension and pressure matrix colonies[str(numbers[index + 1])].tension_matrix = a_mat colonies[str(numbers[index + 1])].pressure_matrix = b_mat print('Next colony number', str(numbers[index + 1])) index = index + 1 if index < len(numbers) - 1: colonies = self.alternative_computation_based_on_prev(numbers, colonies, index, new_dictionary, solver, **kwargs) return colonies