Module pygw.test.data_store_test
Source code
#
# Copyright (c) 2013-2022 Contributors to the Eclipse Foundation
#
# See the NOTICE file distributed with this work for additional information regarding copyright
# ownership. All rights reserved. This program and the accompanying materials are made available
# under the terms of the Apache License, Version 2.0 which accompanies this distribution and is
# available at http://www.apache.org/licenses/LICENSE-2.0.txt
# ===============================================================================================
import pytest
import os
from pygw.store import DataStoreFactory
from pygw.store.accumulo import AccumuloOptions
from pygw.store.bigtable import BigTableOptions
from pygw.store.cassandra import CassandraOptions
from pygw.store.dynamodb import DynamoDBOptions
from pygw.store.hbase import HBaseOptions
from pygw.store.kudu import KuduOptions
from pygw.store.redis import RedisOptions
from pygw.store.rocksdb import RocksDBOptions
from pygw.index import SpatialIndexBuilder
from pygw.query import VectorQueryBuilder
from .conftest import TEST_DIR, POINT_NUMBER_FIELD, POINT_TYPE_NAME
from .conftest import POINT_TYPE_ADAPTER
from .conftest import TEST_DATA
from .conftest import write_test_data
from .conftest import results_as_list
# Test Additions #
from ..statistics.data_type import CountStatistic
from ..statistics.field import NumericRangeStatistic
from ..statistics.index import DuplicateEntryCountStatistic
def test_add_type(test_ds):
# given
index = SpatialIndexBuilder().create_index()
adapter = POINT_TYPE_ADAPTER
# when
test_ds.add_type(adapter, index)
indices = test_ds.get_indices()
types = test_ds.get_types()
# then
assert len(indices) == 1
assert indices[0].get_name() == index.get_name()
assert indices[0].get_index_strategy() == index.get_index_strategy()
assert indices[0].get_index_model() == index.get_index_model()
assert len(types) == 1
assert types[0].get_type_name() == adapter.get_type_name()
def test_add_existing_type(test_ds):
# given
index = SpatialIndexBuilder().create_index()
adapter = POINT_TYPE_ADAPTER
test_ds.add_type(adapter, index)
# when
test_ds.add_type(adapter, index)
indices = test_ds.get_indices(adapter.get_type_name())
# then
assert len(indices) == 1
assert indices[0].get_name() == index.get_name()
assert indices[0].get_index_strategy() == index.get_index_strategy()
assert indices[0].get_index_model() == index.get_index_model()
# Test Removing #
def test_remove_index(test_ds):
# given
index = SpatialIndexBuilder().set_name("idx1").create_index()
index2 = SpatialIndexBuilder().set_name("idx2").create_index()
adapter = POINT_TYPE_ADAPTER
test_ds.add_type(adapter, index)
test_ds.add_type(adapter, index2)
# when
test_ds.remove_index(adapter.get_type_name(), index.get_name())
indices = test_ds.get_indices()
# then
assert len(indices) == 1
assert indices[0].get_name() == index2.get_name()
assert indices[0].get_index_strategy() == index2.get_index_strategy()
assert indices[0].get_index_model() == index2.get_index_model()
def test_remove_index_last(test_ds):
with pytest.raises(Exception) as exec:
# given
index = SpatialIndexBuilder().create_index()
adapter = POINT_TYPE_ADAPTER
test_ds.add_type(adapter, index)
# when
test_ds.remove_index(index.get_name())
# then
assert 'Adapters require at least one index' in str(exec.value)
def test_remove_index_non_exist(test_ds):
# given
index = SpatialIndexBuilder().create_index()
adapter = POINT_TYPE_ADAPTER
test_ds.add_type(adapter, index)
# when
test_ds.remove_index("Corgi")
# then
assert len(test_ds.get_indices()) == 1
def test_remove_type(test_ds):
# given
index = SpatialIndexBuilder().create_index()
adapter = POINT_TYPE_ADAPTER
test_ds.add_type(adapter, index)
write_test_data(test_ds, index)
# when
test_ds.remove_type(adapter.get_type_name())
query = VectorQueryBuilder().build()
res = results_as_list(test_ds.query(query))
# then
assert len(test_ds.get_indices(adapter.get_type_name())) == 0
assert len(test_ds.get_indices()) == 1
assert len(res) == 0
# Test Deleting #
def test_delete(test_ds):
# given
index = SpatialIndexBuilder().set_name("idx1").create_index()
index2 = SpatialIndexBuilder().set_name("idx2").create_index()
adapter = POINT_TYPE_ADAPTER
test_ds.add_type(adapter, index)
test_ds.add_type(adapter, index2)
write_test_data(test_ds, index, index2)
# when
qbldr = VectorQueryBuilder()
constraints_factory = qbldr.constraints_factory()
# filter encompasses 10 features (1, 1) - (10, 10)
constraints = constraints_factory.cql_constraints("BBOX(the_geom, 0.5, 0.5, 10.5, 10.5)")
qbldr.constraints(constraints)
test_ds.delete(qbldr.build())
query = VectorQueryBuilder().build()
res = results_as_list(test_ds.query(query))
# then
assert len(test_ds.get_indices()) == 2
assert len(test_ds.get_types()) == 1
assert len(res) == (len(TEST_DATA) - 10)
# Test Delete All #
def test_delete_all(test_ds):
# given
index = SpatialIndexBuilder().set_name("idx1").create_index()
index2 = SpatialIndexBuilder().set_name("idx2").create_index()
adapter = POINT_TYPE_ADAPTER
test_ds.add_type(adapter, index)
test_ds.add_type(adapter, index2)
write_test_data(test_ds, index, index2)
# when
test_ds.delete_all()
query = VectorQueryBuilder().build()
res = results_as_list(test_ds.query(query))
# then
assert len(test_ds.get_indices()) == 0
assert len(test_ds.get_types()) == 0
assert len(res) == 0
# Test Copy #
def test_copy(test_ds):
# given
options = RocksDBOptions()
options.set_geowave_namespace("geowave.tests")
options.set_directory(os.path.join(TEST_DIR, "datastore2"))
ds2 = DataStoreFactory.create_data_store(options)
adapter = POINT_TYPE_ADAPTER
index = SpatialIndexBuilder().create_index()
test_ds.add_type(adapter, index)
write_test_data(test_ds, index)
# when
test_ds.copy_to(ds2)
indices = ds2.get_indices()
types = ds2.get_types()
query = VectorQueryBuilder().build()
res = results_as_list(ds2.query(query))
# then
assert len(test_ds.get_indices()) == 1
assert len(indices) == 1
assert indices[0].get_name() == index.get_name()
assert indices[0].get_index_strategy() == index.get_index_strategy()
assert indices[0].get_index_model() == index.get_index_model()
assert len(types) == 1
assert types[0].get_type_name() == adapter.get_type_name()
assert len(res) == len(TEST_DATA)
ds2.delete_all()
def test_copy_by_query(test_ds):
# given
options = RocksDBOptions()
options.set_geowave_namespace("geowave.tests")
options.set_directory(os.path.join(TEST_DIR, "datastore2"))
ds2 = DataStoreFactory.create_data_store(options)
adapter = POINT_TYPE_ADAPTER
index = SpatialIndexBuilder().create_index()
test_ds.add_type(adapter, index)
write_test_data(test_ds, index)
# when
qbldr = VectorQueryBuilder()
constraints_factory = qbldr.constraints_factory()
# filter encompasses 10 features (1, 1) - (10, 10)
constraints = constraints_factory.cql_constraints("BBOX(the_geom, 0.5, 0.5, 10.5, 10.5)")
qbldr.all_indices().constraints(constraints)
test_ds.copy_to(ds2, qbldr.build())
indices = ds2.get_indices()
types = ds2.get_types()
query = VectorQueryBuilder().build()
res = results_as_list(ds2.query(query))
# then
assert len(test_ds.get_indices()) == 1
assert len(indices) == 1
assert indices[0].get_name() == index.get_name()
assert indices[0].get_index_strategy() == index.get_index_strategy()
assert indices[0].get_index_model() == index.get_index_model()
assert len(types) == 1
assert types[0].get_type_name() == adapter.get_type_name()
assert len(res) == 10
ds2.delete_all()
# Test Writer #
def test_create_writer(test_ds):
# given
adapter = POINT_TYPE_ADAPTER
index = SpatialIndexBuilder().create_index()
test_ds.add_type(adapter, index)
# when
writer = test_ds.create_writer(adapter.get_type_name())
# then
assert writer is not None
def test_create_writer_null(test_ds):
# when
writer = test_ds.create_writer("Corgi")
# then
assert writer is None
def test_create_writer_null_other(test_ds):
# given
adapter = POINT_TYPE_ADAPTER
index = SpatialIndexBuilder().create_index()
test_ds.add_type(adapter, index)
test_ds.create_writer(adapter.get_type_name())
# when
writer = test_ds.create_writer("Corgi")
# then
assert writer is None
def test_write(test_ds):
# given
adapter = POINT_TYPE_ADAPTER
index = SpatialIndexBuilder().create_index()
test_ds.add_type(adapter, index)
# when
write_test_data(test_ds, index)
query = VectorQueryBuilder().build()
res = results_as_list(test_ds.query(query))
# then
assert len(res) == len(TEST_DATA)
def test_add_get_statistics(test_ds):
# given
adapter = POINT_TYPE_ADAPTER
index = SpatialIndexBuilder().set_name('idx').create_index()
test_ds.add_type(adapter, index)
write_test_data(test_ds, index)
duplicate_entry_count_stat = DuplicateEntryCountStatistic('idx')
duplicate_entry_count_stat.set_tag('test')
count_stat = CountStatistic(POINT_TYPE_NAME)
count_stat.set_tag('test')
numeric_range_statistic = NumericRangeStatistic(POINT_TYPE_NAME, POINT_NUMBER_FIELD)
numeric_range_statistic.set_tag('test')
# when
test_ds.add_statistic(duplicate_entry_count_stat, count_stat, numeric_range_statistic)
# then
duplicate_entry_count_stat = test_ds.get_index_statistic(DuplicateEntryCountStatistic.STATS_TYPE, 'idx', 'test')
assert isinstance(duplicate_entry_count_stat, DuplicateEntryCountStatistic)
assert duplicate_entry_count_stat.get_tag() == 'test'
assert duplicate_entry_count_stat.get_index_name() == 'idx'
count_stat = test_ds.get_data_type_statistic(CountStatistic.STATS_TYPE, POINT_TYPE_NAME, 'test')
assert isinstance(count_stat, CountStatistic)
assert count_stat.get_tag() == 'test'
assert count_stat.get_type_name() == POINT_TYPE_NAME
numeric_range_statistic = test_ds.get_field_statistic(
NumericRangeStatistic.STATS_TYPE, POINT_TYPE_NAME, POINT_NUMBER_FIELD, 'test')
assert isinstance(numeric_range_statistic, NumericRangeStatistic)
assert numeric_range_statistic.get_tag() == 'test'
assert numeric_range_statistic.get_type_name() == POINT_TYPE_NAME
assert numeric_range_statistic.get_field_name() == POINT_NUMBER_FIELD
index_stats = test_ds.get_index_statistics('idx')
assert any(stat.get_tag() == 'test' and isinstance(stat, DuplicateEntryCountStatistic) for stat in index_stats)
data_type_stats = test_ds.get_data_type_statistics(POINT_TYPE_NAME)
assert any(stat.get_tag() == 'test' and isinstance(stat, CountStatistic) for stat in data_type_stats)
field_stats = test_ds.get_field_statistics(POINT_TYPE_NAME, POINT_NUMBER_FIELD)
assert any(stat.get_tag() == 'test' and isinstance(stat, NumericRangeStatistic) for stat in field_stats)
def test_stat_recalc_statistics(test_ds):
# given
adapter = POINT_TYPE_ADAPTER
index = SpatialIndexBuilder().set_name('idx').create_index()
test_ds.add_type(adapter, index)
write_test_data(test_ds, index)
duplicate_entry_count_stat = DuplicateEntryCountStatistic('idx')
duplicate_entry_count_stat.set_tag('test')
count_stat = CountStatistic(POINT_TYPE_NAME)
count_stat.set_tag('test')
numeric_range_statistic = NumericRangeStatistic(POINT_TYPE_NAME, POINT_NUMBER_FIELD)
numeric_range_statistic.set_tag('test')
test_ds.add_empty_statistic(duplicate_entry_count_stat, count_stat, numeric_range_statistic)
assert test_ds.get_statistic_value(duplicate_entry_count_stat) == 0
assert test_ds.get_statistic_value(count_stat) == 0
numeric_range = test_ds.get_statistic_value(numeric_range_statistic)
assert numeric_range.get_minimum() == 0
assert numeric_range.get_maximum() == 0
# when
test_ds.recalc_statistic(duplicate_entry_count_stat, count_stat, numeric_range_statistic)
# then
assert test_ds.get_statistic_value(duplicate_entry_count_stat) == 0
assert test_ds.get_statistic_value(count_stat) == 360
numeric_range = test_ds.get_statistic_value(numeric_range_statistic)
assert numeric_range.get_minimum() == -180
assert numeric_range.get_maximum() == 179
def test_remove_statistics(test_ds):
# given
adapter = POINT_TYPE_ADAPTER
index = SpatialIndexBuilder().set_name('idx').create_index()
test_ds.add_type(adapter, index)
write_test_data(test_ds, index)
duplicate_entry_count_stat = DuplicateEntryCountStatistic('idx')
duplicate_entry_count_stat.set_tag('test')
count_stat = CountStatistic(POINT_TYPE_NAME)
count_stat.set_tag('test')
numeric_range_statistic = NumericRangeStatistic(POINT_TYPE_NAME, POINT_NUMBER_FIELD)
numeric_range_statistic.set_tag('test')
test_ds.add_statistic(duplicate_entry_count_stat, count_stat, numeric_range_statistic)
# and
duplicate_entry_count_stat = test_ds.get_index_statistic(DuplicateEntryCountStatistic.STATS_TYPE, 'idx', 'test')
assert isinstance(duplicate_entry_count_stat, DuplicateEntryCountStatistic)
assert duplicate_entry_count_stat.get_tag() == 'test'
assert duplicate_entry_count_stat.get_index_name() == 'idx'
count_stat = test_ds.get_data_type_statistic(CountStatistic.STATS_TYPE, POINT_TYPE_NAME, 'test')
assert isinstance(count_stat, CountStatistic)
assert count_stat.get_tag() == 'test'
assert count_stat.get_type_name() == POINT_TYPE_NAME
numeric_range_statistic = test_ds.get_field_statistic(
NumericRangeStatistic.STATS_TYPE, POINT_TYPE_NAME, POINT_NUMBER_FIELD, 'test')
assert isinstance(numeric_range_statistic, NumericRangeStatistic)
assert numeric_range_statistic.get_tag() == 'test'
assert numeric_range_statistic.get_type_name() == POINT_TYPE_NAME
assert numeric_range_statistic.get_field_name() == POINT_NUMBER_FIELD
# when
test_ds.remove_statistic(duplicate_entry_count_stat, count_stat, numeric_range_statistic)
# then
assert test_ds.get_index_statistic(DuplicateEntryCountStatistic.STATS_TYPE, 'idx', 'test') is None
assert test_ds.get_data_type_statistic(CountStatistic.STATS_TYPE, POINT_TYPE_NAME, 'test') is None
assert test_ds.get_field_statistic(
NumericRangeStatistic.STATS_TYPE, POINT_TYPE_NAME, POINT_NUMBER_FIELD, 'test') is None
def _test_base_options(options, server_side_possible=True):
options.set_geowave_namespace("test_namespace")
assert options.get_geowave_namespace() == "test_namespace"
persist_data_statistics = not options.is_persist_data_statistics()
options.set_persist_data_statistics(persist_data_statistics)
assert options.is_persist_data_statistics() == persist_data_statistics
secondary_indexing = not options.is_secondary_indexing()
options.set_secondary_indexing(secondary_indexing)
assert options.is_secondary_indexing() == secondary_indexing
block_cache = not options.is_enable_block_cache()
options.set_enable_block_cache(block_cache)
assert options.is_enable_block_cache() == block_cache
server_side_library = not options.is_server_side_library_enabled()
options.set_secondary_indexing(False)
options.set_server_side_library_enabled(server_side_library)
if server_side_possible:
assert options.is_server_side_library_enabled() == server_side_library
else:
assert options.is_server_side_library_enabled() is False
options.set_max_range_decomposition(42)
assert options.get_max_range_decomposition() == 42
options.set_aggregation_max_range_decomposition(43)
assert options.get_aggregation_max_range_decomposition() == 43
visibility = not options.is_visibility_enabled()
options.set_enable_visibility(visibility)
assert options.is_visibility_enabled() == visibility
def test_accumulo_options():
options = AccumuloOptions()
options.set_zookeeper("test_zookeeper")
assert options.get_zookeeper() == "test_zookeeper"
options.set_instance("test_instance")
assert options.get_instance() == "test_instance"
options.set_user("test_user")
assert options.get_user() == "test_user"
options.set_password("test_password")
assert options.get_password() == "test_password"
locality_groups = not options.is_use_locality_groups()
options.set_use_locality_groups(locality_groups)
assert options.is_use_locality_groups() == locality_groups
_test_base_options(options)
def test_bigtable_options():
options = BigTableOptions()
options.set_scan_cache_size(42)
assert options.get_scan_cache_size() == 42
options.set_project_id("test_project_id")
assert options.get_project_id() == "test_project_id"
options.set_instance_id("test_instance_id")
assert options.get_instance_id() == "test_instance_id"
_test_base_options(options)
def test_cassandra_options():
options = CassandraOptions()
options.set_contact_points("test_contact_point")
assert options.get_contact_points() == "test_contact_point"
options.set_datacenter("test_datacenter")
assert options.get_datacenter() == "test_datacenter"
options.set_batch_write_size(42)
assert options.get_batch_write_size() == 42
durable_writes = not options.is_durable_writes()
options.set_durable_writes(durable_writes)
assert options.is_durable_writes() == durable_writes
options.set_replication_factor(43)
assert options.get_replication_factor() == 43
options.set_gc_grace_seconds(44)
assert options.get_gc_grace_seconds() == 44
table_options = {
"test_key_1": "test_value_1",
"test_key_2": "test_value_2"
}
options.set_table_options(table_options)
returned_table_options = options.get_table_options()
assert len(returned_table_options) == len(table_options)
for key in returned_table_options:
assert (key in table_options and returned_table_options[key] == table_options[key])
options.set_compaction_strategy("TimeWindowCompactionStrategy")
assert options.get_compaction_strategy() == "TimeWindowCompactionStrategy"
_test_base_options(options, False)
def test_dynamodb_options():
options = DynamoDBOptions()
options.set_region("us-east-1")
assert options.get_region() == "us-east-1"
options.set_region(None)
assert options.get_region() is None
options.set_endpoint("test_endpoint")
assert options.get_endpoint() == "test_endpoint"
options.set_write_capacity(42)
assert options.get_write_capacity() == 42
options.set_read_capacity(43)
assert options.get_read_capacity() == 43
enable_cache_response_metadata = not options.is_enable_cache_response_metadata()
options.set_enable_cache_response_metadata(enable_cache_response_metadata)
assert options.is_enable_cache_response_metadata() == enable_cache_response_metadata
options.set_protocol("HTTP")
assert options.get_protocol() == "HTTP"
options.set_protocol(None)
assert options.get_protocol() is None
options.set_max_connections(44)
assert options.get_max_connections() == 44
_test_base_options(options, False)
def test_hbase_options():
options = HBaseOptions()
options.set_zookeeper("test_zookeeper")
assert options.get_zookeeper() == "test_zookeeper"
options.set_scan_cache_size(42)
assert options.get_scan_cache_size() == 42
options.set_server_side_library_enabled(True)
verify_coprocessors = not options.is_verify_coprocessors()
options.set_verify_coprocessors(verify_coprocessors)
assert options.is_verify_coprocessors() == verify_coprocessors
options.set_coprocessor_jar("test_jar")
assert options.get_coprocessor_jar() == "test_jar"
_test_base_options(options)
def test_kudu_options():
options = KuduOptions()
options.set_kudu_master("test_master")
assert options.get_kudu_master() == "test_master"
_test_base_options(options, False)
def test_redis_options():
options = RedisOptions()
options.set_address("test_address")
assert options.get_address() == "test_address"
options.set_compression("L4Z")
assert options.get_compression() == "L4Z"
options.set_username("test_username")
assert options.get_username() == "test_username"
options.set_password("test_password")
assert options.get_password() == "test_password"
_test_base_options(options, False)
def test_rocksdb_options():
options = RocksDBOptions()
options.set_directory("test_directory")
assert options.get_directory() == "test_directory"
compact_on_wriite = not options.is_compact_on_write()
options.set_compact_on_write(compact_on_wriite)
assert options.is_compact_on_write() == compact_on_wriite
options.set_batch_write_size(42)
assert options.get_batch_write_size() == 42
_test_base_options(options, False)
Functions
def test_accumulo_options()
-
Source code
def test_accumulo_options(): options = AccumuloOptions() options.set_zookeeper("test_zookeeper") assert options.get_zookeeper() == "test_zookeeper" options.set_instance("test_instance") assert options.get_instance() == "test_instance" options.set_user("test_user") assert options.get_user() == "test_user" options.set_password("test_password") assert options.get_password() == "test_password" locality_groups = not options.is_use_locality_groups() options.set_use_locality_groups(locality_groups) assert options.is_use_locality_groups() == locality_groups _test_base_options(options)
def test_add_existing_type(test_ds)
-
Source code
def test_add_existing_type(test_ds): # given index = SpatialIndexBuilder().create_index() adapter = POINT_TYPE_ADAPTER test_ds.add_type(adapter, index) # when test_ds.add_type(adapter, index) indices = test_ds.get_indices(adapter.get_type_name()) # then assert len(indices) == 1 assert indices[0].get_name() == index.get_name() assert indices[0].get_index_strategy() == index.get_index_strategy() assert indices[0].get_index_model() == index.get_index_model()
def test_add_get_statistics(test_ds)
-
Source code
def test_add_get_statistics(test_ds): # given adapter = POINT_TYPE_ADAPTER index = SpatialIndexBuilder().set_name('idx').create_index() test_ds.add_type(adapter, index) write_test_data(test_ds, index) duplicate_entry_count_stat = DuplicateEntryCountStatistic('idx') duplicate_entry_count_stat.set_tag('test') count_stat = CountStatistic(POINT_TYPE_NAME) count_stat.set_tag('test') numeric_range_statistic = NumericRangeStatistic(POINT_TYPE_NAME, POINT_NUMBER_FIELD) numeric_range_statistic.set_tag('test') # when test_ds.add_statistic(duplicate_entry_count_stat, count_stat, numeric_range_statistic) # then duplicate_entry_count_stat = test_ds.get_index_statistic(DuplicateEntryCountStatistic.STATS_TYPE, 'idx', 'test') assert isinstance(duplicate_entry_count_stat, DuplicateEntryCountStatistic) assert duplicate_entry_count_stat.get_tag() == 'test' assert duplicate_entry_count_stat.get_index_name() == 'idx' count_stat = test_ds.get_data_type_statistic(CountStatistic.STATS_TYPE, POINT_TYPE_NAME, 'test') assert isinstance(count_stat, CountStatistic) assert count_stat.get_tag() == 'test' assert count_stat.get_type_name() == POINT_TYPE_NAME numeric_range_statistic = test_ds.get_field_statistic( NumericRangeStatistic.STATS_TYPE, POINT_TYPE_NAME, POINT_NUMBER_FIELD, 'test') assert isinstance(numeric_range_statistic, NumericRangeStatistic) assert numeric_range_statistic.get_tag() == 'test' assert numeric_range_statistic.get_type_name() == POINT_TYPE_NAME assert numeric_range_statistic.get_field_name() == POINT_NUMBER_FIELD index_stats = test_ds.get_index_statistics('idx') assert any(stat.get_tag() == 'test' and isinstance(stat, DuplicateEntryCountStatistic) for stat in index_stats) data_type_stats = test_ds.get_data_type_statistics(POINT_TYPE_NAME) assert any(stat.get_tag() == 'test' and isinstance(stat, CountStatistic) for stat in data_type_stats) field_stats = test_ds.get_field_statistics(POINT_TYPE_NAME, POINT_NUMBER_FIELD) assert any(stat.get_tag() == 'test' and isinstance(stat, NumericRangeStatistic) for stat in field_stats)
def test_add_type(test_ds)
-
Source code
def test_add_type(test_ds): # given index = SpatialIndexBuilder().create_index() adapter = POINT_TYPE_ADAPTER # when test_ds.add_type(adapter, index) indices = test_ds.get_indices() types = test_ds.get_types() # then assert len(indices) == 1 assert indices[0].get_name() == index.get_name() assert indices[0].get_index_strategy() == index.get_index_strategy() assert indices[0].get_index_model() == index.get_index_model() assert len(types) == 1 assert types[0].get_type_name() == adapter.get_type_name()
def test_bigtable_options()
-
Source code
def test_bigtable_options(): options = BigTableOptions() options.set_scan_cache_size(42) assert options.get_scan_cache_size() == 42 options.set_project_id("test_project_id") assert options.get_project_id() == "test_project_id" options.set_instance_id("test_instance_id") assert options.get_instance_id() == "test_instance_id" _test_base_options(options)
def test_cassandra_options()
-
Source code
def test_cassandra_options(): options = CassandraOptions() options.set_contact_points("test_contact_point") assert options.get_contact_points() == "test_contact_point" options.set_datacenter("test_datacenter") assert options.get_datacenter() == "test_datacenter" options.set_batch_write_size(42) assert options.get_batch_write_size() == 42 durable_writes = not options.is_durable_writes() options.set_durable_writes(durable_writes) assert options.is_durable_writes() == durable_writes options.set_replication_factor(43) assert options.get_replication_factor() == 43 options.set_gc_grace_seconds(44) assert options.get_gc_grace_seconds() == 44 table_options = { "test_key_1": "test_value_1", "test_key_2": "test_value_2" } options.set_table_options(table_options) returned_table_options = options.get_table_options() assert len(returned_table_options) == len(table_options) for key in returned_table_options: assert (key in table_options and returned_table_options[key] == table_options[key]) options.set_compaction_strategy("TimeWindowCompactionStrategy") assert options.get_compaction_strategy() == "TimeWindowCompactionStrategy" _test_base_options(options, False)
def test_copy(test_ds)
-
Source code
def test_copy(test_ds): # given options = RocksDBOptions() options.set_geowave_namespace("geowave.tests") options.set_directory(os.path.join(TEST_DIR, "datastore2")) ds2 = DataStoreFactory.create_data_store(options) adapter = POINT_TYPE_ADAPTER index = SpatialIndexBuilder().create_index() test_ds.add_type(adapter, index) write_test_data(test_ds, index) # when test_ds.copy_to(ds2) indices = ds2.get_indices() types = ds2.get_types() query = VectorQueryBuilder().build() res = results_as_list(ds2.query(query)) # then assert len(test_ds.get_indices()) == 1 assert len(indices) == 1 assert indices[0].get_name() == index.get_name() assert indices[0].get_index_strategy() == index.get_index_strategy() assert indices[0].get_index_model() == index.get_index_model() assert len(types) == 1 assert types[0].get_type_name() == adapter.get_type_name() assert len(res) == len(TEST_DATA) ds2.delete_all()
def test_copy_by_query(test_ds)
-
Source code
def test_copy_by_query(test_ds): # given options = RocksDBOptions() options.set_geowave_namespace("geowave.tests") options.set_directory(os.path.join(TEST_DIR, "datastore2")) ds2 = DataStoreFactory.create_data_store(options) adapter = POINT_TYPE_ADAPTER index = SpatialIndexBuilder().create_index() test_ds.add_type(adapter, index) write_test_data(test_ds, index) # when qbldr = VectorQueryBuilder() constraints_factory = qbldr.constraints_factory() # filter encompasses 10 features (1, 1) - (10, 10) constraints = constraints_factory.cql_constraints("BBOX(the_geom, 0.5, 0.5, 10.5, 10.5)") qbldr.all_indices().constraints(constraints) test_ds.copy_to(ds2, qbldr.build()) indices = ds2.get_indices() types = ds2.get_types() query = VectorQueryBuilder().build() res = results_as_list(ds2.query(query)) # then assert len(test_ds.get_indices()) == 1 assert len(indices) == 1 assert indices[0].get_name() == index.get_name() assert indices[0].get_index_strategy() == index.get_index_strategy() assert indices[0].get_index_model() == index.get_index_model() assert len(types) == 1 assert types[0].get_type_name() == adapter.get_type_name() assert len(res) == 10 ds2.delete_all()
def test_create_writer(test_ds)
-
Source code
def test_create_writer(test_ds): # given adapter = POINT_TYPE_ADAPTER index = SpatialIndexBuilder().create_index() test_ds.add_type(adapter, index) # when writer = test_ds.create_writer(adapter.get_type_name()) # then assert writer is not None
def test_create_writer_null(test_ds)
-
Source code
def test_create_writer_null(test_ds): # when writer = test_ds.create_writer("Corgi") # then assert writer is None
def test_create_writer_null_other(test_ds)
-
Source code
def test_create_writer_null_other(test_ds): # given adapter = POINT_TYPE_ADAPTER index = SpatialIndexBuilder().create_index() test_ds.add_type(adapter, index) test_ds.create_writer(adapter.get_type_name()) # when writer = test_ds.create_writer("Corgi") # then assert writer is None
def test_delete(test_ds)
-
Source code
def test_delete(test_ds): # given index = SpatialIndexBuilder().set_name("idx1").create_index() index2 = SpatialIndexBuilder().set_name("idx2").create_index() adapter = POINT_TYPE_ADAPTER test_ds.add_type(adapter, index) test_ds.add_type(adapter, index2) write_test_data(test_ds, index, index2) # when qbldr = VectorQueryBuilder() constraints_factory = qbldr.constraints_factory() # filter encompasses 10 features (1, 1) - (10, 10) constraints = constraints_factory.cql_constraints("BBOX(the_geom, 0.5, 0.5, 10.5, 10.5)") qbldr.constraints(constraints) test_ds.delete(qbldr.build()) query = VectorQueryBuilder().build() res = results_as_list(test_ds.query(query)) # then assert len(test_ds.get_indices()) == 2 assert len(test_ds.get_types()) == 1 assert len(res) == (len(TEST_DATA) - 10)
def test_delete_all(test_ds)
-
Source code
def test_delete_all(test_ds): # given index = SpatialIndexBuilder().set_name("idx1").create_index() index2 = SpatialIndexBuilder().set_name("idx2").create_index() adapter = POINT_TYPE_ADAPTER test_ds.add_type(adapter, index) test_ds.add_type(adapter, index2) write_test_data(test_ds, index, index2) # when test_ds.delete_all() query = VectorQueryBuilder().build() res = results_as_list(test_ds.query(query)) # then assert len(test_ds.get_indices()) == 0 assert len(test_ds.get_types()) == 0 assert len(res) == 0
def test_dynamodb_options()
-
Source code
def test_dynamodb_options(): options = DynamoDBOptions() options.set_region("us-east-1") assert options.get_region() == "us-east-1" options.set_region(None) assert options.get_region() is None options.set_endpoint("test_endpoint") assert options.get_endpoint() == "test_endpoint" options.set_write_capacity(42) assert options.get_write_capacity() == 42 options.set_read_capacity(43) assert options.get_read_capacity() == 43 enable_cache_response_metadata = not options.is_enable_cache_response_metadata() options.set_enable_cache_response_metadata(enable_cache_response_metadata) assert options.is_enable_cache_response_metadata() == enable_cache_response_metadata options.set_protocol("HTTP") assert options.get_protocol() == "HTTP" options.set_protocol(None) assert options.get_protocol() is None options.set_max_connections(44) assert options.get_max_connections() == 44 _test_base_options(options, False)
def test_hbase_options()
-
Source code
def test_hbase_options(): options = HBaseOptions() options.set_zookeeper("test_zookeeper") assert options.get_zookeeper() == "test_zookeeper" options.set_scan_cache_size(42) assert options.get_scan_cache_size() == 42 options.set_server_side_library_enabled(True) verify_coprocessors = not options.is_verify_coprocessors() options.set_verify_coprocessors(verify_coprocessors) assert options.is_verify_coprocessors() == verify_coprocessors options.set_coprocessor_jar("test_jar") assert options.get_coprocessor_jar() == "test_jar" _test_base_options(options)
def test_kudu_options()
-
Source code
def test_kudu_options(): options = KuduOptions() options.set_kudu_master("test_master") assert options.get_kudu_master() == "test_master" _test_base_options(options, False)
def test_redis_options()
-
Source code
def test_redis_options(): options = RedisOptions() options.set_address("test_address") assert options.get_address() == "test_address" options.set_compression("L4Z") assert options.get_compression() == "L4Z" options.set_username("test_username") assert options.get_username() == "test_username" options.set_password("test_password") assert options.get_password() == "test_password" _test_base_options(options, False)
def test_remove_index(test_ds)
-
Source code
def test_remove_index(test_ds): # given index = SpatialIndexBuilder().set_name("idx1").create_index() index2 = SpatialIndexBuilder().set_name("idx2").create_index() adapter = POINT_TYPE_ADAPTER test_ds.add_type(adapter, index) test_ds.add_type(adapter, index2) # when test_ds.remove_index(adapter.get_type_name(), index.get_name()) indices = test_ds.get_indices() # then assert len(indices) == 1 assert indices[0].get_name() == index2.get_name() assert indices[0].get_index_strategy() == index2.get_index_strategy() assert indices[0].get_index_model() == index2.get_index_model()
def test_remove_index_last(test_ds)
-
Source code
def test_remove_index_last(test_ds): with pytest.raises(Exception) as exec: # given index = SpatialIndexBuilder().create_index() adapter = POINT_TYPE_ADAPTER test_ds.add_type(adapter, index) # when test_ds.remove_index(index.get_name()) # then assert 'Adapters require at least one index' in str(exec.value)
def test_remove_index_non_exist(test_ds)
-
Source code
def test_remove_index_non_exist(test_ds): # given index = SpatialIndexBuilder().create_index() adapter = POINT_TYPE_ADAPTER test_ds.add_type(adapter, index) # when test_ds.remove_index("Corgi") # then assert len(test_ds.get_indices()) == 1
def test_remove_statistics(test_ds)
-
Source code
def test_remove_statistics(test_ds): # given adapter = POINT_TYPE_ADAPTER index = SpatialIndexBuilder().set_name('idx').create_index() test_ds.add_type(adapter, index) write_test_data(test_ds, index) duplicate_entry_count_stat = DuplicateEntryCountStatistic('idx') duplicate_entry_count_stat.set_tag('test') count_stat = CountStatistic(POINT_TYPE_NAME) count_stat.set_tag('test') numeric_range_statistic = NumericRangeStatistic(POINT_TYPE_NAME, POINT_NUMBER_FIELD) numeric_range_statistic.set_tag('test') test_ds.add_statistic(duplicate_entry_count_stat, count_stat, numeric_range_statistic) # and duplicate_entry_count_stat = test_ds.get_index_statistic(DuplicateEntryCountStatistic.STATS_TYPE, 'idx', 'test') assert isinstance(duplicate_entry_count_stat, DuplicateEntryCountStatistic) assert duplicate_entry_count_stat.get_tag() == 'test' assert duplicate_entry_count_stat.get_index_name() == 'idx' count_stat = test_ds.get_data_type_statistic(CountStatistic.STATS_TYPE, POINT_TYPE_NAME, 'test') assert isinstance(count_stat, CountStatistic) assert count_stat.get_tag() == 'test' assert count_stat.get_type_name() == POINT_TYPE_NAME numeric_range_statistic = test_ds.get_field_statistic( NumericRangeStatistic.STATS_TYPE, POINT_TYPE_NAME, POINT_NUMBER_FIELD, 'test') assert isinstance(numeric_range_statistic, NumericRangeStatistic) assert numeric_range_statistic.get_tag() == 'test' assert numeric_range_statistic.get_type_name() == POINT_TYPE_NAME assert numeric_range_statistic.get_field_name() == POINT_NUMBER_FIELD # when test_ds.remove_statistic(duplicate_entry_count_stat, count_stat, numeric_range_statistic) # then assert test_ds.get_index_statistic(DuplicateEntryCountStatistic.STATS_TYPE, 'idx', 'test') is None assert test_ds.get_data_type_statistic(CountStatistic.STATS_TYPE, POINT_TYPE_NAME, 'test') is None assert test_ds.get_field_statistic( NumericRangeStatistic.STATS_TYPE, POINT_TYPE_NAME, POINT_NUMBER_FIELD, 'test') is None
def test_remove_type(test_ds)
-
Source code
def test_remove_type(test_ds): # given index = SpatialIndexBuilder().create_index() adapter = POINT_TYPE_ADAPTER test_ds.add_type(adapter, index) write_test_data(test_ds, index) # when test_ds.remove_type(adapter.get_type_name()) query = VectorQueryBuilder().build() res = results_as_list(test_ds.query(query)) # then assert len(test_ds.get_indices(adapter.get_type_name())) == 0 assert len(test_ds.get_indices()) == 1 assert len(res) == 0
def test_rocksdb_options()
-
Source code
def test_rocksdb_options(): options = RocksDBOptions() options.set_directory("test_directory") assert options.get_directory() == "test_directory" compact_on_wriite = not options.is_compact_on_write() options.set_compact_on_write(compact_on_wriite) assert options.is_compact_on_write() == compact_on_wriite options.set_batch_write_size(42) assert options.get_batch_write_size() == 42 _test_base_options(options, False)
def test_stat_recalc_statistics(test_ds)
-
Source code
def test_stat_recalc_statistics(test_ds): # given adapter = POINT_TYPE_ADAPTER index = SpatialIndexBuilder().set_name('idx').create_index() test_ds.add_type(adapter, index) write_test_data(test_ds, index) duplicate_entry_count_stat = DuplicateEntryCountStatistic('idx') duplicate_entry_count_stat.set_tag('test') count_stat = CountStatistic(POINT_TYPE_NAME) count_stat.set_tag('test') numeric_range_statistic = NumericRangeStatistic(POINT_TYPE_NAME, POINT_NUMBER_FIELD) numeric_range_statistic.set_tag('test') test_ds.add_empty_statistic(duplicate_entry_count_stat, count_stat, numeric_range_statistic) assert test_ds.get_statistic_value(duplicate_entry_count_stat) == 0 assert test_ds.get_statistic_value(count_stat) == 0 numeric_range = test_ds.get_statistic_value(numeric_range_statistic) assert numeric_range.get_minimum() == 0 assert numeric_range.get_maximum() == 0 # when test_ds.recalc_statistic(duplicate_entry_count_stat, count_stat, numeric_range_statistic) # then assert test_ds.get_statistic_value(duplicate_entry_count_stat) == 0 assert test_ds.get_statistic_value(count_stat) == 360 numeric_range = test_ds.get_statistic_value(numeric_range_statistic) assert numeric_range.get_minimum() == -180 assert numeric_range.get_maximum() == 179
def test_write(test_ds)
-
Source code
def test_write(test_ds): # given adapter = POINT_TYPE_ADAPTER index = SpatialIndexBuilder().create_index() test_ds.add_type(adapter, index) # when write_test_data(test_ds, index) query = VectorQueryBuilder().build() res = results_as_list(test_ds.query(query)) # then assert len(res) == len(TEST_DATA)