Skip to content

Commit 781243f

Browse files
committed
Further import and format cleanups
1 parent 68e2cab commit 781243f

File tree

12 files changed

+102
-86
lines changed

12 files changed

+102
-86
lines changed

rnaseq_pipeline/gsheet.py

+8-9
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
1-
import argparse
1+
import logging
22
import logging
33
import os
44
import os.path
55
import pickle
6-
import sys
7-
from os.path import dirname, expanduser, join
8-
from pkg_resources import resource_filename
6+
from os.path import join
97

10-
from googleapiclient.discovery import build
11-
from google_auth_oauthlib.flow import InstalledAppFlow
12-
from google.auth.transport.requests import Request
13-
import luigi
148
import pandas as pd
159
import xdg.BaseDirectory
10+
from google.auth.transport.requests import Request
11+
from google_auth_oauthlib.flow import InstalledAppFlow
12+
from googleapiclient.discovery import build
13+
from pkg_resources import resource_filename
1614

1715
SCOPES = ['https://www.googleapis.com/auth/spreadsheets.readonly']
1816
CREDENTIALS_FILE = resource_filename('rnaseq_pipeline', 'credentials.json')
@@ -47,7 +45,8 @@ def retrieve_spreadsheet(spreadsheet_id, sheet_name):
4745
service = build('sheets', 'v4', credentials=_authenticate(), cache_discovery=None)
4846

4947
# Retrieve the documents contents from the Docs service.
50-
rnaseq_pipeline_queue = service.spreadsheets().values().get(spreadsheetId=spreadsheet_id, range=sheet_name).execute()
48+
rnaseq_pipeline_queue = service.spreadsheets().values().get(spreadsheetId=spreadsheet_id,
49+
range=sheet_name).execute()
5150

5251
# this will fail if people add new columns
5352
df = pd.DataFrame(rnaseq_pipeline_queue['values'][1:], columns=rnaseq_pipeline_queue['values'][0])

rnaseq_pipeline/platforms.py

+24-24
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ def get_trim_single_end_reads_task(r1, dest, **kwargs):
1313
pass
1414

1515
@abstractmethod
16-
def get_trim_paired_reads_task(r1,r2, r1_dest, r2_dest, **kwargs):
16+
def get_trim_paired_reads_task(r1, r2, r1_dest, r2_dest, **kwargs):
1717
pass
1818

1919
class BgiPlatform(Platform):
@@ -33,18 +33,18 @@ def __init__(self, instrument):
3333

3434
def get_trim_single_end_reads_task(self, r1, dest, **kwargs):
3535
return cutadapt.TrimReads(
36-
r1,
37-
dest,
38-
adapter_3prime=BgiPlatform.FORWARD_FILTER,
39-
**kwargs)
36+
r1,
37+
dest,
38+
adapter_3prime=BgiPlatform.FORWARD_FILTER,
39+
**kwargs)
4040

4141
def get_trim_paired_reads_task(self, r1, r2, r1_dest, r2_dest, **kwargs):
4242
return cutadapt.TrimPairedReads(
43-
r1, r2,
44-
r1_dest, r2_dest,
45-
adapter_3prime=BgiPlatform.FORWARD_FILTER,
46-
reverse_adapter_3prime=BgiPlatform.REVERSE_FILTER,
47-
**kwargs)
43+
r1, r2,
44+
r1_dest, r2_dest,
45+
adapter_3prime=BgiPlatform.FORWARD_FILTER,
46+
reverse_adapter_3prime=BgiPlatform.REVERSE_FILTER,
47+
**kwargs)
4848

4949
class IlluminaPlatform(Platform):
5050
"""
@@ -59,18 +59,18 @@ def __init__(self, instrument):
5959

6060
def get_trim_single_end_reads_task(self, r1, dest, **kwargs):
6161
return cutadapt.TrimReads(
62-
r1,
63-
dest,
64-
adapter_3prime=IlluminaPlatform.UNIVERSAL_ADAPTER,
65-
**kwargs)
62+
r1,
63+
dest,
64+
adapter_3prime=IlluminaPlatform.UNIVERSAL_ADAPTER,
65+
**kwargs)
6666

6767
def get_trim_paired_reads_task(self, r1, r2, r1_dest, r2_dest, **kwargs):
6868
return cutadapt.TrimPairedReads(
69-
r1, r2,
70-
r1_dest, r2_dest,
71-
adapter_3prime=IlluminaPlatform.UNIVERSAL_ADAPTER,
72-
reverse_adapter_3prime=IlluminaPlatform.UNIVERSAL_ADAPTER,
73-
**kwargs)
69+
r1, r2,
70+
r1_dest, r2_dest,
71+
adapter_3prime=IlluminaPlatform.UNIVERSAL_ADAPTER,
72+
reverse_adapter_3prime=IlluminaPlatform.UNIVERSAL_ADAPTER,
73+
**kwargs)
7474

7575
class IlluminaNexteraPlatform(Platform):
7676
"""
@@ -85,11 +85,11 @@ def __init__(self, instrument):
8585

8686
def get_trim_single_end_reads_task(self, r1, dest, **kwargs):
8787
return cutadapt.TrimReads(
88-
r1,
89-
dest,
90-
cut=12,
91-
adapter_3prime=IlluminaNexteraPlatform.NEXTERA_ADAPTER,
92-
**kwargs)
88+
r1,
89+
dest,
90+
cut=12,
91+
adapter_3prime=IlluminaNexteraPlatform.NEXTERA_ADAPTER,
92+
**kwargs)
9393

9494
def get_trim_paired_reads_task(self, r1, r2, r1_dest, r2_dest, **kwargs):
9595
raise NotImplementedError

rnaseq_pipeline/sources/arrayexpress.py

+10-6
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
from urllib.request import urlretrieve
21
import os
32
from os.path import join
3+
from urllib.request import urlretrieve
44

55
import luigi
6-
from luigi.task import WrapperTask
76
import pandas as pd
87
from bioluigi.tasks.utils import TaskWithOutputMixin
8+
from luigi.task import WrapperTask
99

1010
from ..config import rnaseq_pipeline
1111
from ..platforms import IlluminaPlatform
@@ -21,11 +21,13 @@ class DownloadArrayExpressFastq(luigi.Task):
2121
def run(self):
2222
with self.output().temporary_path() as dest_filename:
2323
urlretrieve(self.fastq_url,
24-
reporthook=lambda numblocks, blocksize, totalsize: self.set_progress_percentage(100.0 * numblocks * blocksize / totalsize),
24+
reporthook=lambda numblocks, blocksize, totalsize: self.set_progress_percentage(
25+
100.0 * numblocks * blocksize / totalsize),
2526
filename=dest_filename)
2627

2728
def output(self):
28-
return luigi.LocalTarget(join(cfg.OUTPUT_DIR, cfg.DATA, 'arrayexpress', self.sample_id, os.path.basename(self.fastq_url)))
29+
return luigi.LocalTarget(
30+
join(cfg.OUTPUT_DIR, cfg.DATA, 'arrayexpress', self.sample_id, os.path.basename(self.fastq_url)))
2931

3032
class DownloadArrayExpressSample(TaskWithOutputMixin, WrapperTask):
3133
experiment_id = luigi.Parameter()
@@ -47,8 +49,10 @@ class DownloadArrayExpressExperiment(TaskWithOutputMixin, WrapperTask):
4749

4850
def run(self):
4951
# store metadata locally under metadata/arrayexpress/<experiment_id>.sdrf.txt
50-
ae_df = pd.read_csv('http://www.ebi.ac.uk/arrayexpress/files/{0}/{0}.sdrf.txt'.format(self.experiment_id), sep='\t')
52+
ae_df = pd.read_csv('http://www.ebi.ac.uk/arrayexpress/files/{0}/{0}.sdrf.txt'.format(self.experiment_id),
53+
sep='\t')
5154
ae_df = ae_df[ae_df['Comment[LIBRARY_STRATEGY]'] == 'RNA-Seq']
5255
# FIXME: properly handle the order of paired FASTQs
53-
yield [DownloadArrayExpressSample(experiment_id=self.experiment_id, sample_id=sample_id, fastq_urls=s['Comment[FASTQ_URI]'].sort_values().tolist())
56+
yield [DownloadArrayExpressSample(experiment_id=self.experiment_id, sample_id=sample_id,
57+
fastq_urls=s['Comment[FASTQ_URI]'].sort_values().tolist())
5458
for sample_id, s in ae_df.groupby('Comment[ENA_SAMPLE]')]

rnaseq_pipeline/sources/gemma.py

+9-4
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
import os
44
from os.path import join
55

6-
from bioluigi.tasks.utils import DynamicTaskWithOutputMixin, DynamicWrapperTask
76
import luigi
7+
from bioluigi.tasks.utils import DynamicTaskWithOutputMixin, DynamicWrapperTask
88
from luigi.util import requires
99

1010
from .geo import DownloadGeoSample
@@ -33,9 +33,12 @@ def run(self):
3333
accession = sample['accession']['accession']
3434
external_database = sample['accession']['externalDatabase']['name']
3535
if external_database == 'GEO':
36-
download_sample_tasks.append(DownloadGeoSample(accession, metadata=dict(experiment_id=self.experiment_id, sample_id=accession)))
36+
download_sample_tasks.append(
37+
DownloadGeoSample(accession, metadata=dict(experiment_id=self.experiment_id, sample_id=accession)))
3738
elif external_database == 'SRA':
38-
download_sample_tasks.append(DownloadSraExperiment(accession, metadata=dict(experiment_id=self.experiment_id, sample_id=accession)))
39+
download_sample_tasks.append(DownloadSraExperiment(accession,
40+
metadata=dict(experiment_id=self.experiment_id,
41+
sample_id=accession)))
3942
else:
4043
logger.warning('Downloading %s from %s is not supported.', accession, external_database)
4144
continue
@@ -51,7 +54,9 @@ def run(self):
5154
continue
5255

5356
if len(sample.output()) == 0:
54-
logger.warning('GEO sample %s has no associated FASTQs from which batch information can be extracted.', sample.sample_id)
57+
logger.warning(
58+
'GEO sample %s has no associated FASTQs from which batch information can be extracted.',
59+
sample.sample_id)
5560
continue
5661

5762
# TODO: find a cleaner way to obtain the SRA run accession

rnaseq_pipeline/sources/geo.py

+16-13
Original file line numberDiff line numberDiff line change
@@ -6,29 +6,25 @@
66
import logging
77
import os
88
import re
9-
import requests
109
import tarfile
1110
import tempfile
12-
1311
from datetime import timedelta
1412
from functools import lru_cache
1513
from os.path import join
16-
from subprocess import Popen
1714
from urllib.parse import urlparse, parse_qs
1815
from xml.etree import ElementTree
1916

2017
import luigi
2118
import requests
22-
2319
from bioluigi.tasks.utils import DynamicTaskWithOutputMixin, DynamicWrapperTask, TaskWithMetadataMixin
2420
from luigi.util import requires
2521

22+
from .sra import DownloadSraExperiment
2623
from ..config import rnaseq_pipeline
2724
from ..miniml_utils import collect_geo_samples, collect_geo_samples_info
28-
from ..platforms import Platform, BgiPlatform, IlluminaPlatform
25+
from ..platforms import BgiPlatform, IlluminaPlatform
2926
from ..targets import ExpirableLocalTarget
3027
from ..utils import RerunnableTaskMixin
31-
from .sra import DownloadSraExperiment
3228

3329
cfg = rnaseq_pipeline()
3430

@@ -53,7 +49,8 @@ def match_geo_platform(geo_platform):
5349
return BgiPlatform(geo_platform_title.split(' ')[0])
5450

5551
# Illumina HiSeq X and NextSeq 550 platforms are not prefixed with Illumina
56-
illumina_regex = [r'Illumina (.+) \(.+\)', r'(HiSeq X .+) \(.+\)', r'(NextSeq 550) \(.+\)', r'(NextSeq 2000) \(.+\)']
52+
illumina_regex = [r'Illumina (.+) \(.+\)', r'(HiSeq X .+) \(.+\)', r'(NextSeq 550) \(.+\)',
53+
r'(NextSeq 2000) \(.+\)']
5754

5855
for r in illumina_regex:
5956
illumina_match = re.match(r, geo_platform_title)
@@ -85,7 +82,8 @@ def run(self):
8582
f.write(res.text)
8683

8784
def output(self):
88-
return ExpirableLocalTarget(join(cfg.OUTPUT_DIR, cfg.METADATA, 'geo', '{}.xml'.format(self.gsm)), ttl=timedelta(days=14))
85+
return ExpirableLocalTarget(join(cfg.OUTPUT_DIR, cfg.METADATA, 'geo', '{}.xml'.format(self.gsm)),
86+
ttl=timedelta(days=14))
8987

9088
@requires(DownloadGeoSampleMetadata)
9189
class DownloadGeoSample(DynamicTaskWithOutputMixin, DynamicWrapperTask):
@@ -131,22 +129,25 @@ class DownloadGeoSeriesMetadata(TaskWithMetadataMixin, RerunnableTaskMixin, luig
131129
def run(self):
132130
if self.output().is_stale():
133131
logger.info('%s is stale, redownloading...', self.output())
134-
res = requests.get('https://ftp.ncbi.nlm.nih.gov/geo/series/'+ self.gse[:-3] + 'nnn/' + self.gse + '/miniml/' + self.gse + '_family.xml.tgz', stream=True)
132+
res = requests.get('https://ftp.ncbi.nlm.nih.gov/geo/series/' + self.gse[
133+
:-3] + 'nnn/' + self.gse + '/miniml/' + self.gse + '_family.xml.tgz',
134+
stream=True)
135135
res.raise_for_status()
136136
# we need to use a temporary file because Response.raw does not allow seeking
137137
with tempfile.TemporaryFile() as tmp:
138138
for chunk in res.iter_content(chunk_size=1024):
139139
tmp.write(chunk)
140140
tmp.seek(0)
141-
with tarfile.open(fileobj=tmp, mode='r:gz') as fin, self.output().temporary_path() as fpath, open(fpath, 'wb') as f:
141+
with tarfile.open(fileobj=tmp, mode='r:gz') as fin, self.output().temporary_path() as fpath, open(fpath,
142+
'wb') as f:
142143
reader = fin.extractfile(self.gse + '_family.xml')
143144
while chunk := reader.read(1024):
144145
f.write(chunk)
145146

146-
147147
def output(self):
148148
# TODO: remove the _family suffix
149-
return ExpirableLocalTarget(join(cfg.OUTPUT_DIR, cfg.METADATA, 'geo', '{}_family.xml'.format(self.gse)), ttl=timedelta(days=14))
149+
return ExpirableLocalTarget(join(cfg.OUTPUT_DIR, cfg.METADATA, 'geo', '{}_family.xml'.format(self.gse)),
150+
ttl=timedelta(days=14))
150151

151152
@requires(DownloadGeoSeriesMetadata)
152153
class DownloadGeoSeries(DynamicTaskWithOutputMixin, DynamicWrapperTask):
@@ -177,7 +178,9 @@ def run(self):
177178
with self.output().open('w') as info_out:
178179
for sample in samples:
179180
if len(sample.output()) == 0:
180-
logger.warning('GEO sample %s has no associated FASTQs from which batch information can be extracted.', sample.sample_id)
181+
logger.warning(
182+
'GEO sample %s has no associated FASTQs from which batch information can be extracted.',
183+
sample.sample_id)
181184
continue
182185

183186
# TODO: find a cleaner way to obtain the SRA run accession

rnaseq_pipeline/sources/local.py

+5-4
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
from glob import glob
21
import os
2+
from glob import glob
33
from os.path import join
44

5-
from bioluigi.tasks.utils import DynamicTaskWithOutputMixin, DynamicWrapperTask
65
import luigi
6+
from bioluigi.tasks.utils import DynamicTaskWithOutputMixin, DynamicWrapperTask
77

88
from ..config import rnaseq_pipeline
99

@@ -23,11 +23,12 @@ def platform(self):
2323

2424
def output(self):
2525
# we sort to make sure that pair ends are in correct order
26-
return [luigi.LocalTarget(f) for f in sorted(glob(join(cfg.OUTPUT_DIR, cfg.DATA, 'local', self.experiment_id, self.sample_id, '*.fastq.gz')))]
26+
return [luigi.LocalTarget(f) for f in
27+
sorted(glob(join(cfg.OUTPUT_DIR, cfg.DATA, 'local', self.experiment_id, self.sample_id, '*.fastq.gz')))]
2728

2829
class DownloadLocalExperiment(DynamicTaskWithOutputMixin, DynamicWrapperTask):
2930
experiment_id = luigi.Parameter()
3031

3132
def run(self):
3233
yield [DownloadLocalSample(self.experiment_id, os.path.basename(f))
33-
for f in glob(join(cfg.OUTPUT_DIR, cfg.DATA, 'local', self.experiment_id, '*'))]
34+
for f in glob(join(cfg.OUTPUT_DIR, cfg.DATA, 'local', self.experiment_id, '*'))]

rnaseq_pipeline/webviewer/__init__.py

+16-10
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
1-
from os import listdir
21
from os.path import basename, getctime, join, dirname
3-
from glob import glob
42
import datetime
3+
from glob import glob
4+
from os.path import basename, getctime, join, dirname
55

66
import luigi
7-
from flask import Flask, send_file, render_template, url_for, request, abort
87
import pandas as pd
8+
from flask import Flask, send_file, render_template, abort
99

1010
from rnaseq_pipeline.config import rnaseq_pipeline
11-
from rnaseq_pipeline.tasks import GenerateReportForExperiment, CountExperiment, ExtractGeoSeriesBatchInfo, SubmitExperimentDataToGemma, SubmitExperimentBatchInfoToGemma
1211
from rnaseq_pipeline.gemma import GemmaTaskMixin
12+
from rnaseq_pipeline.tasks import GenerateReportForExperiment, CountExperiment, SubmitExperimentDataToGemma, \
13+
SubmitExperimentBatchInfoToGemma
1314

1415
app = Flask('rnaseq_pipeline.webviewer')
1516

@@ -31,7 +32,9 @@ def not_found(e):
3132
@app.route('/')
3233
def home():
3334
report_dir = join(cfg.OUTPUT_DIR, 'report')
34-
latest_experiments = [(basename(path), basename(dirname(path)), datetime.datetime.now() - datetime.datetime.fromtimestamp(getctime(path))) for path in sorted(glob(join(report_dir, '*', '*')), key=lambda path: -getctime(path))]
35+
latest_experiments = [(basename(path), basename(dirname(path)),
36+
datetime.datetime.now() - datetime.datetime.fromtimestamp(getctime(path))) for path in
37+
sorted(glob(join(report_dir, '*', '*')), key=lambda path: -getctime(path))]
3538
return render_template('index.html', latest_experiments=latest_experiments[:10])
3639

3740
@app.route('/experiment/<experiment_id>')
@@ -44,14 +47,16 @@ def experiment_summary(experiment_id):
4447
submit_batch_info_task = SubmitExperimentBatchInfoToGemma(experiment_id)
4548
ebi_task = submit_batch_info_task.requires()
4649
if ebi_task.complete():
47-
batch_info = pd.read_csv(ebi_task.output().path, sep='\t', names=['geo_sample_id', 'sra_run_id', 'geo_platform_id', 'sra_experiment_url', 'fastq_header'])
50+
batch_info = pd.read_csv(ebi_task.output().path, sep='\t',
51+
names=['geo_sample_id', 'sra_run_id', 'geo_platform_id', 'sra_experiment_url',
52+
'fastq_header'])
4853
else:
4954
batch_info = None
5055

5156
return render_template('experiment-summary.html',
52-
experiment_id=experiment_id, batch_info=batch_info,
53-
submit_data_task=submit_data_task,
54-
submit_batch_info_task=submit_batch_info_task)
57+
experiment_id=experiment_id, batch_info=batch_info,
58+
submit_data_task=submit_data_task,
59+
submit_batch_info_task=submit_batch_info_task)
5560

5661
@app.route('/experiment/<experiment_id>/batch-info')
5762
def experiment_batch_info(experiment_id):
@@ -94,7 +99,8 @@ def experiment_report(experiment_id, reference_id=None):
9499
else:
95100
taxon = 'human'
96101
source = 'local'
97-
generate_report_task = GenerateReportForExperiment(experiment_id, reference_id=reference_id, taxon=taxon, source=source)
102+
generate_report_task = GenerateReportForExperiment(experiment_id, reference_id=reference_id, taxon=taxon,
103+
source=source)
98104
if not generate_report_task.complete():
99105
abort(404, f'No report available for {experiment_id} in {reference_id}.')
100106
return send_file(generate_report_task.output().path)

tests/test_geo.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
from rnaseq_pipeline.sources.geo import match_geo_platform, retrieve_geo_platform_miniml, DownloadGeoSampleMetadata, DownloadGeoSeriesMetadata
1+
import luigi
2+
23
from rnaseq_pipeline.platforms import IlluminaPlatform
4+
from rnaseq_pipeline.sources.geo import match_geo_platform, DownloadGeoSampleMetadata, \
5+
DownloadGeoSeriesMetadata
36
from rnaseq_pipeline.utils import remove_task_output
47

5-
import luigi
6-
78
def test_parse_illumina_platform():
89
platform = match_geo_platform('GPL30172')
910
assert isinstance(platform, IlluminaPlatform)

tests/test_platforms.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from rnaseq_pipeline.platforms import Platform, BgiPlatform, IlluminaPlatform, IlluminaNexteraPlatform
1+
from rnaseq_pipeline.platforms import BgiPlatform, IlluminaPlatform, IlluminaNexteraPlatform
22

33
def test_bgi_platform_trim_single_end_reads():
44
task = BgiPlatform('BGISEQ-500').get_trim_single_end_reads_task('r1', 'r1_dest')

0 commit comments

Comments
 (0)