Source code for tissue_purifier.io.read_anndata_from_csv

from typing import Union, Optional, Iterator, Iterable, Generator
import numpy
from anndata import AnnData
from os import PathLike
import os.path
from pathlib import Path
import scipy
import pandas as pd
import gzip
import bz2


[docs]def anndata_from_expression_csv(filename: str, key: str, transpose: bool, top_n_rows: int = None): """ Read a csv file with the expression data (i.e. count matrix) and returns an anndata object. To be used when your collaborators give you a .csv file instead of a .h5ad file. If :attr:`transpose == False`: The csv is expected to have a header: 'barcode', 'gene_name_1', ..., 'gene_name_N'. Each entry is expected to be something-like: ACCDAT, 2, 0, ...., 1 If :attr:`transpose == True`: The csv is expected to have a header: 'gene', 'barcode_name_1', ..., 'barcode_name_N'. Each entry is expected to be something-like: Arhgap18, 2, 0, ...., 1 Args: filename: the path to the csv file to read key: the column name associated with the observations. It defaults to 'barcode' is :attr:`transpose` == False and 'gene' if :attr:`transpose` == True. transpose: bool, whether the matrix is gene_by_cell or cell_by_gene top_n_rows: int, the number of the top rows to read. Set to a small value (like 20) for debugging. Note: The output will always be cell_by_gene (i.e. cells=obs, genes=var) regardless the value of :attr:`transpose` Returns: adata: An anndata object with (i) anndata.X the counts in a scipy Compressed Sparse Row format (ii) anndata.obs the observation name (often the cellular barcodes) (iii) anndata.var the variable names (often the gene names) """ def read_top_n_rows(_filename, _n_rows): _df_tmp = pd.read_csv(_filename, nrows=_n_rows) _dir_name = os.path.dirname(_filename) _basename = os.path.basename(_filename) _new_filename = os.path.join(_dir_name, "debug_"+_basename) _df_tmp.to_csv(_new_filename) return _new_filename if isinstance(top_n_rows, int) and 1 <= top_n_rows <= 1000: # this functionality is meant for debug. n_rows should be a small number, i.e. <= 1000 new_filename = read_top_n_rows(filename, top_n_rows) else: # will read the entire file new_filename = filename # extract the name of the columns and check which column in the barcode column df_tmp = pd.read_csv(new_filename, nrows=2) col_names = list(df_tmp.columns) observation_col_index = col_names.index(key) # print("barcode_col_index -> {0}, len(col_names) -> {1}".format(barcode_col_index, len(col_names))) columns_to_read = numpy.arange(observation_col_index, len(col_names)) first_column_names = observation_col_index is not None return read_text(new_filename, transpose=transpose, delimiter=',', columns_to_read=columns_to_read, first_column_names=first_column_names, dtype='int16')
def read_text( filename: Union[PathLike, Iterator[str]], transpose: bool, delimiter: Optional[str] = None, columns_to_read: Optional[numpy.ndarray] = None, first_column_names: Optional[bool] = None, dtype: str = "float32") -> AnnData: """ Read `.txt`, `.tab`, `.data` (text) file or csv files (in that case set delimiter=',') and returns an anndata object Args: filename: Data file, filename or stream. delimiter: Delimiter that separates data within text file. If `None`, will split at arbitrary number of white spaces, which is different from enforcing splitting at single white space `' '`. columns_to_read: An array with the integer index corresponding to the columns to read. first_column_names: Assume the first column stores row names (most likely the barcodes). dtype: Numpy data type for the data (not the column names). transpose: bool, where the data is cell_by_gene or gene_by_cell Returns: An anndata object. """ if not isinstance(filename, (PathLike, str, bytes)): return _read_text(filename, delimiter, first_column_names, dtype, transpose) filename = Path(filename) if filename.suffix == ".gz": with gzip.open(str(filename), mode="rt") as f: return _read_text(f, delimiter, columns_to_read, first_column_names, dtype, transpose) elif filename.suffix == ".bz2": with bz2.open(str(filename), mode="rt") as f: return _read_text(f, delimiter, columns_to_read, first_column_names, dtype, transpose) else: with filename.open() as f: return _read_text(f, delimiter, columns_to_read, first_column_names, dtype, transpose) def _iter_lines(file_like: Iterable[str]) -> Generator[str, None, None]: """ Helper for iterating only nonempty lines without line breaks """ for line in file_like: line = line.rstrip("\r\n") if line: yield line def _read_line_data(line, delimiter, columns_to_read): """ Helper to read only the columns you are interested in """ line_numpy = numpy.array(line.split(delimiter)) if columns_to_read is None: line_data = line_numpy else: line_data = line_numpy[columns_to_read] return line_data def _read_text( f: Iterator[str], delimiter: Optional[str], columns_to_read: Optional[numpy.ndarray], first_column_names: Optional[bool], dtype: str, transpose: bool) -> AnnData: # initialize the storage comments = [] data = [] col_names = [] row_names = [] # iterator over the lines lines = _iter_lines(f) # read header and column names for line in lines: if line.startswith("#"): comment = line.lstrip("# ") if comment: comments.append(comment) else: if delimiter is not None and delimiter not in line: raise ValueError(f"Did not find delimiter {delimiter!r} in first line.") line_data = _read_line_data(line, delimiter, columns_to_read) # the first row could have the columns names in it if not is_float(line_data[-1]): col_names = line_data.tolist() else: if not is_float(line_data[0]) or first_column_names: first_column_names = True row_names.append(line_data[0]) data.append(line_data[1:].astype(dtype=dtype)) else: data.append(line_data.astype(dtype=dtype)) break # try reading col_names from the last comment line if not col_names: if len(comments) > 0: col_names = numpy.array(comments[-1].split()) else: # just numbers as col_names col_names = numpy.arange(len(data[0])).astype(str) col_names = numpy.array(col_names, dtype=str) # read another line to check if first column contains row names or not if first_column_names is None: first_column_names = False for line in lines: line_data = _read_line_data(line, delimiter, columns_to_read) if first_column_names or not is_float(line_data[0]): first_column_names = True row_names.append(line_data[0]) data.append(line_data[1:].astype(dtype=dtype)) else: data.append(line_data.astype(dtype=dtype)) break # if row names are just integers if len(data) > 1 and data[0].size != data[1].size: first_column_names = True col_names = numpy.array(data[0]).astype(int).astype(str) row_names.append(data[1][0].astype(int).astype(str)) data = [data[1][1:]] # parse the file for line in lines: line_data = _read_line_data(line, delimiter, columns_to_read) if first_column_names: row_names.append(line_data[0]) data.append(line_data[1:].astype(dtype=dtype)) else: data.append(line_data.astype(dtype=dtype)) if data[0].size != data[-1].size: raise ValueError( f"Length of first line ({data[0].size}) is different " f"from length of last line ({data[-1].size})." ) data = numpy.array(data, dtype=dtype) # transform row_names if not row_names: row_names = numpy.arange(len(data)).astype(str) else: row_names = numpy.array(row_names) for iname, name in enumerate(row_names): row_names[iname] = name.strip('"') # adapt col_names if necessary if col_names.size > data.shape[1]: col_names = col_names[1:] for iname, name in enumerate(col_names): col_names[iname] = name.strip('"') if transpose: return AnnData( scipy.sparse.csr_matrix(numpy.transpose(data)), obs=dict(obs_names=col_names), var=dict(var_names=row_names), dtype=dtype) else: return AnnData( scipy.sparse.csr_matrix(data), obs=dict(obs_names=row_names), var=dict(var_names=col_names), dtype=dtype) def is_float(string): """ Check whether string is float. See also -------- http://stackoverflow.com/questions/736043/checking-if-a-string-can-be-converted-to-float-in-python """ try: float(string) return True except ValueError: return False