Spaces:
Runtime error
Runtime error
| import holoviews as hv | |
| import numpy as np | |
| import pandas as pd | |
| import panel as pn | |
| import param | |
| from holoviews.operation.datashader import dynspread, rasterize | |
| from utils import ( | |
| DATASET_COLUMNS, | |
| DATASETS, | |
| DATASHADER_LOGO, | |
| DATASHADER_URL, | |
| DEFAULT_DATASET, | |
| DESCRIPTION, | |
| ESA_EASTING, | |
| ESA_NORTHING, | |
| MAJOR_TOM_LOGO, | |
| MAJOR_TOM_LYRICS, | |
| MAJOR_TOM_PICTURE, | |
| MAJOR_TOM_REF_URL, | |
| PANEL_LOGO, | |
| PANEL_URL, | |
| get_closest_rows, | |
| get_image, | |
| get_meta_data, | |
| ) | |
| class DatasetInput(pn.viewable.Viewer): | |
| value = param.Selector( | |
| default=DEFAULT_DATASET, | |
| objects=DATASETS, | |
| allow_None=False, | |
| label="Dataset", | |
| doc="""The name of the dataset""", | |
| ) | |
| data = param.DataFrame(allow_None=False, doc="""The metadata dataset""") | |
| columns = param.Dict(allow_None=False, doc="""The columns of the dataset""") | |
| def __panel__(self): | |
| return pn.widgets.RadioButtonGroup.from_param( | |
| self.param.value, button_style="outline" | |
| ) | |
| def _update_data(self): | |
| columns = DATASET_COLUMNS[self.value] | |
| data = pn.cache(get_meta_data)(dataset=self.value) | |
| self.param.update(columns=columns, data=data) | |
| class MapInput(pn.viewable.Viewer): | |
| data = param.DataFrame(allow_refs=True, allow_None=False) | |
| data_in_view = param.DataFrame(allow_None=False) | |
| data_selected = param.DataFrame(allow_None=False) | |
| _plot = param.Parameter(allow_None=False) | |
| _pointer_x = param.Parameter(allow_None=False) | |
| _pointer_y = param.Parameter(allow_None=False) | |
| _range_xy = param.Parameter(allow_None=False) | |
| _tap = param.Parameter(allow_None=False) | |
| updating = param.Boolean() | |
| def __panel__(self): | |
| return pn.Column( | |
| pn.pane.HoloViews( | |
| self._plot, height=550, width=800, loading=self.param.updating | |
| ), | |
| self._description, | |
| ) | |
| def _handle_data_dask_change(self): | |
| with self.param.update(updating=True): | |
| data = self.data[["centre_easting", "centre_northing"]].copy() | |
| points = hv.Points( | |
| data, kdims=["centre_easting", "centre_northing"], vdims=[] | |
| ) | |
| rangexy = hv.streams.RangeXY(source=points) | |
| tap = hv.streams.Tap(source=points, x=ESA_EASTING, y=ESA_NORTHING) | |
| agg = rasterize( | |
| points, link_inputs=True, x_sampling=0.0001, y_sampling=0.0001 | |
| ) | |
| dyn = dynspread(agg) | |
| dyn.opts(cmap="kr_r", colorbar=True) | |
| pointerx = hv.streams.PointerX(x=ESA_EASTING, source=points) | |
| pointery = hv.streams.PointerY(y=ESA_NORTHING, source=points) | |
| vline = hv.DynamicMap(lambda x: hv.VLine(x), streams=[pointerx]) | |
| hline = hv.DynamicMap(lambda y: hv.HLine(y), streams=[pointery]) | |
| tiles = hv.Tiles( | |
| "https://tile.openstreetmap.org/{Z}/{X}/{Y}.png", name="OSM" | |
| ).opts(xlabel="Longitude", ylabel="Latitude") | |
| self.param.update( | |
| _plot=tiles * agg * dyn * hline * vline, | |
| _pointer_x=pointerx, | |
| _pointer_y=pointery, | |
| _range_xy=rangexy, | |
| _tap=tap, | |
| ) | |
| update_viewed = pn.bind( | |
| self._update_data_in_view, | |
| rangexy.param.x_range, | |
| rangexy.param.y_range, | |
| watch=True, | |
| ) | |
| update_viewed() | |
| update_selected = pn.bind( | |
| self._update_data_selected, tap.param.x, tap.param.y, watch=True | |
| ) | |
| update_selected() | |
| def _update_data_in_view(self, x_range, y_range): | |
| if not x_range or not y_range: | |
| self.data_in_view = self.data | |
| return | |
| data = self.data | |
| data = data[ | |
| (data.centre_easting.between(*x_range)) | |
| & (data.centre_northing.between(*y_range)) | |
| ] | |
| self.data_in_view = data.reset_index(drop=True) | |
| def _update_data_selected(self, tap_x, tap_y): | |
| self.data_selected = get_closest_rows(self.data, tap_x, tap_y) | |
| def _description(self): | |
| return f"Rows: {len(self.data_in_view):,}" | |
| class ImageInput(pn.viewable.Viewer): | |
| data = param.DataFrame( | |
| allow_refs=True, allow_None=False, doc="""The metadata selected""" | |
| ) | |
| columns = param.Dict( | |
| allow_refs=True, allow_None=False, doc="""The list of columns of the dataset""" | |
| ) | |
| column_name = param.Selector( | |
| label="Image Type", | |
| allow_None=False, | |
| doc="""The name of the image type to view""", | |
| ) | |
| updating = param.Boolean() | |
| meta_data = param.DataFrame() | |
| image = param.Parameter() | |
| plot = param.Parameter() | |
| _timestamp = param.Selector(label="Timestamp", objects=[None], doc="""The timestamp of the sample to view""") | |
| def __panel__(self): | |
| return pn.Column( | |
| pn.Row( | |
| pn.widgets.RadioButtonGroup.from_param( | |
| self.param._timestamp, | |
| button_style="outline", | |
| align="end", | |
| disabled=self.param.updating, | |
| ), | |
| pn.widgets.Select.from_param( | |
| self.param.column_name, disabled=self.param.updating | |
| ), | |
| ), | |
| pn.Tabs( | |
| pn.pane.HoloViews( | |
| self.param.plot, | |
| height=800, | |
| width=800, | |
| name="Interactive Image", | |
| ), | |
| pn.pane.Image( | |
| self.param.image, | |
| name="Static Image", | |
| width=800, | |
| ), | |
| pn.widgets.Tabulator( | |
| self.param.meta_data, | |
| name="Meta Data", | |
| disabled=True, | |
| ), | |
| pn.pane.Markdown(self.code, name="Code"), | |
| dynamic=True, | |
| loading=self.param.updating, | |
| ), | |
| ) | |
| def _update_timestamp(self): | |
| if self.data.empty: | |
| default_value = None | |
| options = [None] | |
| else: | |
| options = sorted(self.data["timestamp"].unique()) | |
| default_value = options[0] | |
| self.param._timestamp.objects = options | |
| if not self._timestamp in options: | |
| self._timestamp = default_value | |
| def _update_column_names(self): | |
| options = sorted(self.columns) | |
| default_value = "Thumbnail" | |
| self.param.column_name.objects = options | |
| if not self.column_name in options: | |
| self.column_name = default_value | |
| def column(self): | |
| return self.columns[self.column_name] | |
| def _update_plot(self): | |
| if self.data.empty or not self._timestamp or not self.column_name: | |
| self.meta_data = self.data.T | |
| self.image = None | |
| self.plot = hv.RGB(np.array([])) | |
| else: | |
| with self.param.update(updating=True): | |
| row = self.data[self.data.timestamp == self._timestamp].iloc[0] | |
| self.meta_data = pd.DataFrame(row) | |
| self.image = image = pn.cache(get_image)(row, self.column) | |
| image_array = np.array(image) | |
| if image_array.ndim == 2: | |
| self.plot = hv.Image(image_array).opts( | |
| cmap="gray_r", xaxis=None, yaxis=None, colorbar=True | |
| ) | |
| else: | |
| self.plot = hv.RGB(image_array).opts(xaxis=None, yaxis=None) | |
| def code(self): | |
| if self.meta_data.empty: | |
| return "" | |
| parquet_url = self.meta_data.T["parquet_url"].iloc[0] | |
| parquet_row = self.meta_data.T["parquet_row"].iloc[0] | |
| return f"""\ | |
| ```bash | |
| pip install aiohttp fsspec holoviews numpy panel pyarrow requests | |
| ``` | |
| ```python | |
| from io import BytesIO | |
| import holoviews as hv | |
| import numpy as np | |
| import panel as pn | |
| import pyarrow.parquet as pq | |
| from fsspec.parquet import open_parquet_file | |
| from PIL import Image | |
| pn.extension() | |
| parquet_url = "{parquet_url}" | |
| parquet_row = {parquet_row} | |
| column = "{self.column}" | |
| with open_parquet_file(parquet_url, columns=[column]) as f: | |
| with pq.ParquetFile(f) as pf: | |
| first_row_group = pf.read_row_group(parquet_row, columns=[column]) | |
| stream = BytesIO(first_row_group[column][0].as_py()) | |
| image = Image.open(stream) | |
| image_array = np.array(image) | |
| if image_array.ndim==2: | |
| plot = hv.Image(image_array).opts(cmap="gray", colorbar=True) | |
| else: | |
| plot = hv.RGB(image_array) | |
| plot.opts(xaxis=None, yaxis=None) | |
| pn.panel(plot).servable() | |
| ``` | |
| ```bash | |
| panel serve app.py --autoreload | |
| ``` | |
| """ | |
| class App(param.Parameterized): | |
| sidebar = param.Parameter() | |
| main = param.Parameter() | |
| def __init__(self, **params): | |
| super().__init__(**params) | |
| self.sidebar = self._create_sidebar() | |
| self.main = pn.FlexBox( | |
| pn.Column( | |
| pn.Row( | |
| pn.indicators.LoadingSpinner(value=True, size=50), | |
| "**Loading data...**", | |
| ), | |
| MAJOR_TOM_LYRICS, | |
| ) | |
| ) | |
| pn.state.onload(self._update_main) | |
| def _create_sidebar(self): | |
| return pn.Column( | |
| pn.pane.Image( | |
| MAJOR_TOM_LOGO, | |
| link_url=MAJOR_TOM_REF_URL, | |
| height=60, | |
| sizing_mode="stretch_width", | |
| ), | |
| pn.pane.Image( | |
| MAJOR_TOM_PICTURE, | |
| link_url=MAJOR_TOM_REF_URL, | |
| sizing_mode="stretch_width", | |
| ), | |
| DESCRIPTION, | |
| pn.pane.Image(PANEL_LOGO, link_url=PANEL_URL, width=200, margin=(10, 20)), | |
| pn.pane.Image( | |
| DATASHADER_LOGO, link_url=DATASHADER_URL, width=200, margin=(10, 20) | |
| ), | |
| ) | |
| def _create_main_content(self): | |
| dataset = DatasetInput() | |
| map_input = MapInput(data=dataset.param.data) | |
| image_input = ImageInput( | |
| data=map_input.param.data_selected, columns=dataset.param.columns | |
| ) | |
| return pn.Column(dataset, map_input), image_input | |
| def _update_main(self): | |
| self.main[:] = list(self._create_main_content()) | |