""" ParGenFS algorithm with accessory functions
"""
import argparse
from operator import itemgetter
from math import sqrt
from typing import Dict, List, Set, Union
try:
from got.taxonomies.taxonomy import Taxonomy, Node
from got.taxonomies.ete3_functions import make_ete3_lifted, save_ete3
except ImportError as e:
from taxonomy import Taxonomy, Node
from ete3_functions import make_ete3_lifted, save_ete3
LIMIT = .15
GAMMA = .9
LAMBDA = .2
[docs]def enumerate_tree_layers(node: Node, current_layer: int = 0) -> None:
"""Assigns a corresponding layer numbers to the all nodes of the taxonomy
Parameters
----------
node : Node
the root of the taxonomy tree / sub-tree
current_layer : int, default=0
a layer number (nodes' level) to assign
Returns
-------
None
"""
node.e = current_layer
for child in node:
enumerate_tree_layers(child, current_layer=current_layer+1)
[docs]def get_cluster_k(tree_leaves: List[Node], node_names: List[str], \
membership_matrix: List[List[float]], k: int) -> Dict[str, float]:
"""Return a membership vector corresponding to a k-th cluster
Parameters
----------
tree_leaves : List[Node]
all the leaves of the taxonomy
node_names : List[str]
string names of nodes
membership_matrix : List[List[float]]
membership matrix, size: (number_of_clusters x number_of_node_names)
k : int
index of a cluster
Returns
-------
Dict[str, float]
membership dictionary corresponding to a k-th cluster
"""
node_to_weight = dict(zip(node_names, (c[k] for c in membership_matrix)))
cluster = {t.name: node_to_weight.get(t.name, 0) for t in tree_leaves}
return cluster
[docs]def annotate_with_sum(node: Node, cluster: Dict[str, float]) -> float:
"""Annotates a tree with the cluster weights
Parameters
----------
node : Node
the root of the taxonomy tree / sub-tree
cluster : Dict[str, float]
the cluster
Returns
-------
float
a not-normalized sum of squared weights
"""
summ = .0
if node.is_leaf:
membership = cluster.get(node.name, .0)
node.score = membership
node.u = membership
summ += membership ** 2
else:
node.score = .0
node.u = .0
for i in node:
summ += annotate_with_sum(i, cluster)
return summ
[docs]def normalize_and_return_leaf_weights(node: Node, summ: float) -> List[List[Union[str, float]]]:
"""Normalizes leaves' weights (annotations)
Parameters
----------
node : Node
the root of the taxonomy tree / sub-tree
summ : float
sum of weights value
Returns
-------
List[List[Union[str, float]]]
a list of weights normalized
"""
leaf_weights: List[List[Union[str, float]]] = []
if node.is_leaf:
node.u /= sqrt(summ)
leaf_weights.append([node.u, node.name])
for i in node:
leaf_weights.extend(normalize_and_return_leaf_weights(i, summ))
return leaf_weights
[docs]def truncate_weights(node: Node, threshold: float) -> float:
"""Truncates (sets to zero) leaves' weights (annotations)
what are less than the threshold
Parameters
----------
node : Node
the root of the taxonomy tree / sub-tree
threshold : float
the threshold value
Returns
-------
float
summ of the resulting squared weights
"""
summ = .0
if node.is_leaf:
if node.u < threshold:
node.u = 0
else:
summ += node.u ** 2
for child in node:
summ += truncate_weights(child, threshold)
return summ
[docs]def set_internal_weights(node: Node) -> float:
"""Sets weights for internal nodes
Parameters
----------
node : Node
the root of the taxonomy tree / sub-tree
Returns
-------
float
summ of the resulting squared weights
"""
if node.is_leaf:
return node.u ** 2
summ = .0
for child in node:
summ += set_internal_weights(child)
node.u = sqrt(summ)
return summ
[docs]def prune_tree(node: Node) -> None:
"""Prunes the tree / sub-tree
Parameters
----------
node : Node
the root of the taxonomy tree / sub-tree
Returns
-------
None
"""
if node.is_internal:
for child in node:
prune_tree(child)
if not node.u:
g_label = 0
if node.is_internal and not any([t.children for t in node]):
g_label = 1
node.children = []
if g_label:
node.G = [node]
[docs]def set_gaps_for_tree(node: Node) -> None:
"""Sets gaps for the tree / sub-tree
Parameters
----------
node : Node
the root of the taxonomy tree / sub-tree
Returns
-------
None
"""
gaps = [child for child in node if child.u == 0]
if not node.G:
node.G = gaps
for child in node:
set_gaps_for_tree(child)
[docs]def set_parameters(node: Node) -> None:
"""Sets parameters G, v, V for the tree / sub-tree
Parameters
----------
node : Node
the root of the taxonomy tree / sub-tree
Returns
-------
None
"""
for child in node:
set_parameters(child)
g_set = sum([child.G for child in node], node.G or [])
added: Set[str] = set()
g_result = []
for gap in g_set:
if gap.name not in added:
g_result.append(gap)
added |= {gap.name}
node.G = g_result
node.v = node.parent.u if node.parent else 1.
node.V = sum(g.v if g.v is not None else 0 for g in node.G)
[docs]def reduce_edges(node: Node) -> None:
"""Reduces tree edges for the tree / sub-tree
Parameters
----------
node : Node
the root of the taxonomy tree / sub-tree
Returns
-------
None
"""
if len(node) == 1:
temp = node.children[0].children
node.children = temp
def update_layer_number(t_node):
t_node.e -= 1
for child in t_node:
update_layer_number(child)
for child in node:
update_layer_number(child)
for child in node:
reduce_edges(child)
[docs]def make_init_step(node: Node, gamma_v: float) -> None:
"""Init step of ParGenFS algorithm
Parameters
----------
node : Node
the root of the taxonomy tree / sub-tree
gamma_v : float
gamma value
Returns
-------
None
"""
if node.is_internal:
for child in node:
make_init_step(child, gamma_v)
else:
if node.u > 0:
node.H = [node]
node.L = []
node.p = gamma_v * node.u
else:
node.H = []
node.L = []
node.p = 0
node.o = True
[docs]def make_recursive_step(node: Node, gamma_v: float, lambda_v: float) -> None:
"""Recursive step of ParGenFS algorithm
Parameters
----------
node : Node
the root of the taxonomy tree / sub-tree
gamma_v : float
gamma value
lambda_v : float
lambda value
Returns
-------
None
"""
if node.is_internal:
for child in node:
make_recursive_step(child, gamma_v, lambda_v)
if not node.o:
sum_penalty = sum([t.p if t.p is not None else 0 for t in node], .0)
if node.u + lambda_v * node.V < sum_penalty:
node.H = [node]
node.L = node.G
node.p = node.u + lambda_v * node.V
else:
node.H = sum((t.H if t.H is not None else [] for t in node), [])
node.L = sum((t.L if t.L is not None else [] for t in node), [])
node.p = sum((t.p if t.p is not None else 0 for t in node), .0)
[docs]def indicate_offshoots(node: Node) -> None:
"""Indicates all the offshoots in the tree / sub-tree
Parameters
----------
node : Node
the root of the taxonomy tree / sub-tree
Returns
-------
None
"""
if node.is_internal:
for child in node:
indicate_offshoots(child)
else:
if node.parent:
heads = [t.name for t in node.parent.H]
if not heads:
node.of = 1
[docs]def make_result_table(node: Node) -> List[List[str]]:
"""Indicates all the offshoots in the tree / sub-tree
Parameters
----------
node : Node
the root of the taxonomy tree / sub-tree
Returns
-------
List[List[str]]
resulting table for printing / saving in a file
"""
table = []
if node.is_internal:
for child in node:
table.extend(make_result_table(child))
table.append([node.index.rstrip(".") or "", node.name, str(round(node.u, 3)),
str(round(node.p, 3)), str(round(node.V, 3)),
"; ".join([" ".join([s.index, s.name]) for s in (node.G or [])]),
"; ".join([" ".join([s.index, s.name]) for s in (node.H or [])]),
"; ".join([" ".join([s.index, s.name]) for s in (node.L or [])])])
return table
[docs]def save_result_table(result_table: List[List[str]], filename: str = "table.csv") -> None:
"""Writes resulting table in a file
Parameters
----------
result_table : List[List[str]]
table for saving
filename : str, default="table.csv"
name of the file for writing
Returns
-------
None
"""
result_table = sorted(result_table, key=lambda x: (len(x), x))
result_table = [["index", "name", "u", "p", "V", "G", "H", "L"]] + result_table
with open(filename, 'w') as file_opened:
for table_row in result_table:
file_opened.write('\t'.join(table_row) + '\n')
print(f"Table saved in the file: {filename}")
[docs]def pargenfs(cluster: Dict[str, float], taxonomy_tree: Taxonomy, \
gamma_v: float = .2, lambda_v: float = .2) -> None:
"""Runs ParGenFS algorithm over a taxonomy tree
Parameters
----------
cluster : List[float]
the cluster to generalize
taxonomy_tree : Taxonomy
the taxonomy tree
gamma_v : float, default=.2
gamma penalty value
lambda_v : float, default=.2
lambda penalty value
Returns
-------
None
"""
enumerate_tree_layers(taxonomy_tree.root)
summ = annotate_with_sum(taxonomy_tree.root, cluster)
leaf_weights = normalize_and_return_leaf_weights(taxonomy_tree.root, summ)
print(f"Number of leaves: {len(leaf_weights)}")
print("All positive weights:")
for weight, i in sorted(leaf_weights, key=itemgetter(0), reverse=True):
if not weight:
break
print(f"{i:<60} {weight:.5f}")
summ_after_trunc = truncate_weights(taxonomy_tree.root, LIMIT)
if summ_after_trunc == 0:
print("The threshold is too large. Try a smaller one.")
return
updated_leaf_weights = normalize_and_return_leaf_weights(taxonomy_tree.root, summ_after_trunc)
print("After transformation:")
for weight, i in sorted(updated_leaf_weights, key=itemgetter(0), reverse=True):
if not weight:
break
print(f"{i:<60} {weight:.5f}")
print("Setting weights for internal nodes")
root_u = set_internal_weights(taxonomy_tree.root)
print(f"Membership in root: {root_u:.5f}")
print("Pruning tree...")
prune_tree(taxonomy_tree.root)
print("Setting gaps...")
set_gaps_for_tree(taxonomy_tree.root)
print("Other parameters setting...")
set_parameters(taxonomy_tree.root)
reduce_edges(taxonomy_tree.root)
print("ParGenFS main steps...")
make_init_step(taxonomy_tree.root, gamma_v)
make_recursive_step(taxonomy_tree.root, gamma_v, lambda_v)
indicate_offshoots(taxonomy_tree.root)
print("Done. Saving...")
result_table = make_result_table(taxonomy_tree.root)
save_result_table(result_table)
ete3_desc = make_ete3_lifted(taxonomy_tree.root)
save_ete3(ete3_desc)
print("ete representation saved.")
#print(ete3_desc)
print("Done.")
[docs]def run(taxonomy_file: str, taxonomy_leaves: str, clusters: str, cluster_number: int) -> None:
"""Obtains cluster and runs ParGenFS algorithm over a taxonomy tree
Parameters
----------
taxonomy_file : str
taxonomy description in *.fvtr format
taxonomy_leaves : str
taxonomy leaves in *.txt format
clusters : str
clusters' membership table in *.dat format
cluster_number : int
number of cluster for lifting
Returns
-------
None
"""
gamma_val = GAMMA
lambda_val = LAMBDA
taxonomy_tree = Taxonomy(taxonomy_file)
node_names = []
with open(taxonomy_leaves, 'r') as file_opened:
for i in file_opened.readlines():
splitted = i.split('\t')
if len(splitted) > 1:
node_names.append(splitted[1].strip())
else:
node_names.append(splitted[0].strip())
membership_matrix = []
with open(clusters, 'r') as file_opened:
for line in file_opened.readlines():
try:
membership_vector = list(map(float, line.split('\t')))
except ValueError:
membership_vector = list(map(float, line.split(' ')))
membership_matrix.append(membership_vector)
tree_leaves = taxonomy_tree.leaves
cluster = get_cluster_k(tree_leaves, node_names, membership_matrix, cluster_number)
pargenfs(cluster, taxonomy_tree, gamma_v=gamma_val, lambda_v=lambda_val)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Lifting a cluster over a taxonomy.")
parser.add_argument("taxonomy_file", type=str,
help="taxonomy description in *.fvtr format")
parser.add_argument("taxonomy_leaves", type=str,
help="taxonomy leaves in *.txt format")
parser.add_argument("clusters", type=str,
help="clusters' membership table in *.dat format")
parser.add_argument("cluster_number", type=int,
help="number of cluster for lifting")
args = parser.parse_args()
run(args.taxonomy_file, args.taxonomy_leaves, args.clusters, args.cluster_number)