Skip to content

Commit b53074b

Browse files
committed
Provision and test against remote ES instance (#184)
* provision and test against remote ES instance This commit adds the functionality to allow the testing framework to provision and run the tests against a remotely running Elasticsearch instance. The '-p' command line option has been modified to take an optional argument, the URL - optionally with credentials - of the ES instance to use. * mask actual used password in stdout message Replace password with star character. (cherry picked from commit eec7dd7)
1 parent cc0ccc7 commit b53074b

File tree

4 files changed

+78
-56
lines changed

4 files changed

+78
-56
lines changed

test/integration/data.py

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
from elasticsearch import Elasticsearch
1818

19-
REQ_AUTH = ("elastic", Elasticsearch.AUTH_PASSWORD)
20-
2119
TABLEAU_DATASET_BASE_URL = "https://raw.githubusercontent.com/elastic/connector-plugin-sdk/120fe213c4bce30d9424c155fbd9b2ad210239e0/tests/datasets/TestV1/"
2220

2321
CALCS_TEMPLATE =\
@@ -282,14 +280,16 @@ class TestData(object):
282280
_csv_header = None
283281
_csv_lines = None
284282

283+
_es = None
285284
_offline_dir = None
286285
_mode = None
287286

288-
def __init__(self, mode=MODE_INDEX, offline_dir=None):
287+
def __init__(self, es, mode=MODE_INDEX, offline_dir=None):
289288
self._csv_md5 = {}
290289
self._csv_header = {}
291290
self._csv_lines = {}
292291

292+
self._es = es
293293
self._offline_dir = offline_dir
294294
self._mode = mode
295295

@@ -376,8 +376,8 @@ def _prepare_tableau_load(self, file_name, index_name, index_template):
376376
ndjson = self._get_csv_as_ndjson(TABLEAU_DATASET_BASE_URL, file_name, index_name)
377377

378378
if self.MODE_NOINDEX < self._mode:
379-
with requests.put("http://localhost:%s/_template/%s_template" % (Elasticsearch.ES_PORT, index_name),
380-
json=index_template, auth=REQ_AUTH) as req:
379+
with requests.put("%s/_template/%s_template" % (self._es.base_url(), index_name),
380+
json=index_template, auth=self._es.credentials()) as req:
381381
if req.status_code != 200:
382382
raise Exception("PUT %s template failed with code: %s (content: %s)" % (index_name,
383383
req.status_code, req.text))
@@ -386,16 +386,17 @@ def _prepare_tableau_load(self, file_name, index_name, index_template):
386386

387387
def _post_ndjson(self, ndjsons, index_name, pipeline_name=None):
388388
print("Indexing data for index '%s'." % index_name)
389-
url = "http://localhost:%s/%s/_doc/_bulk" % (Elasticsearch.ES_PORT, index_name)
389+
url = "%s/%s/_doc/_bulk" % (self._es.base_url(), index_name)
390390
if pipeline_name:
391391
url += "?pipeline=%s" % pipeline_name
392392
if type(ndjsons) is not list:
393393
ndjsons = [ndjsons]
394394
for n in ndjsons:
395-
with requests.post(url, data=n, headers = {"Content-Type": "application/x-ndjson"}, auth=REQ_AUTH) as req:
395+
with requests.post(url, data=n, headers = {"Content-Type": "application/x-ndjson"},
396+
auth=self._es.credentials()) as req:
396397
if req.status_code != 200:
397-
raise Exception("bulk POST to %s failed with code: %s (content: %s)" % (index_name, req.status_code,
398-
req.text))
398+
raise Exception("bulk POST to %s failed with code: %s (content: %s)" % (index_name,
399+
req.status_code, req.text))
399400
reply = json.loads(req.text)
400401
if reply["errors"]:
401402
raise Exception("bulk POST to %s failed with content: %s" % (index_name, req.text))
@@ -405,8 +406,8 @@ def _wait_for_results(self, index_name):
405406
hits = 0
406407
waiting_since = time.time()
407408
while hits < MIN_INDEXED_DOCS:
408-
url = "http://localhost:%s/%s/_search" % (Elasticsearch.ES_PORT, index_name)
409-
req = requests.get(url, timeout = Elasticsearch.REQ_TIMEOUT, auth=REQ_AUTH)
409+
url = "%s/%s/_search" % (self._es.base_url(), index_name)
410+
req = requests.get(url, timeout = Elasticsearch.REQ_TIMEOUT, auth=self._es.credentials())
410411
if req.status_code != 200:
411412
raise Exception("failed to _search %s: code: %s, body: %s" % (index_name, req.status_code, req.text))
412413
answer = json.loads(req.text)
@@ -420,8 +421,8 @@ def _delete_if_needed(self, index_name):
420421
return
421422
print("Deleting any old index '%s'." % index_name);
422423

423-
url = "http://localhost:%s/%s" % (Elasticsearch.ES_PORT, index_name)
424-
with requests.delete(url, timeout = Elasticsearch.REQ_TIMEOUT, auth=REQ_AUTH) as req:
424+
url = "%s/%s" % (self._es.base_url(), index_name)
425+
with requests.delete(url, timeout = Elasticsearch.REQ_TIMEOUT, auth=self._es.credentials()) as req:
425426
if req.status_code != 200 and req.status_code != 404:
426427
raise Exception("Deleting index %s failed; code=%s, body: %s." %
427428
(index_name, req.status_code, req.text))
@@ -433,8 +434,8 @@ def _load_tableau_sample(self, file_name, index_name, template, pipeline=None):
433434
self._delete_if_needed(index_name)
434435

435436
if pipeline:
436-
with requests.put("http://localhost:%s/_ingest/pipeline/parse_%s" % (Elasticsearch.ES_PORT,
437-
index_name), json=pipeline, auth=REQ_AUTH) as req:
437+
with requests.put("%s/_ingest/pipeline/parse_%s" % (self._es.base_url(), index_name),
438+
json=pipeline, auth=self._es.credentials()) as req:
438439
if req.status_code != 200:
439440
raise Exception("PUT %s pipeline failed with code: %s (content: %s) " % (index_name,
440441
req.status_code, req.text))
@@ -483,8 +484,8 @@ def _put_sample_template(self, sample_name, index_name):
483484
# turn it to JSON (to deal with trailing commas past last member on a level
484485
mapping = eval(mapping)
485486
# PUT the built template
486-
url = "http://localhost:%s/_template/%s_template" % (Elasticsearch.ES_PORT, index_name)
487-
with requests.put(url, json=mapping, auth=REQ_AUTH, timeout=Elasticsearch.REQ_TIMEOUT) as req:
487+
url = "%s/_template/%s_template" % (self._es.base_url(), index_name)
488+
with requests.put(url, json=mapping, auth=self._es.credentials(), timeout=Elasticsearch.REQ_TIMEOUT) as req:
488489
if req.status_code != 200:
489490
raise Exception("PUT %s template failed with code: %s (content: %s)" % (index_name,
490491
req.status_code, req.text))

test/integration/elasticsearch.py

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import signal
1919
import subprocess
2020
from subprocess import PIPE
21+
from urllib.parse import urlparse
2122

2223
ARTIF_URL = "https://artifacts-api.elastic.co/v1/versions"
2324
ES_PROJECT = "elasticsearch"
@@ -33,19 +34,39 @@ class Elasticsearch(object):
3334
TERM_TIMEOUT = 5 # how long to wait for processes to die (before KILLing)
3435
REQ_TIMEOUT = 20 # default GET request timeout
3536
ES_PORT = 9200
37+
ES_CREDENTIALS = ("elastic", "elastic") # user, pwd
38+
ES_BASE_URL = "http://localhost:%s" % ES_PORT
3639
ES_START_TIMEOUT = 60 # how long to wait for Elasticsearch to come online
3740
ES_401_RETRIES = 8 # how many "starting" 401 answers to accept before giving up (.5s waiting inbetween)
38-
AUTH_PASSWORD = "elastic"
3941

4042
_offline_dir = None
43+
_credentials = None
44+
_base_url = None
4145

42-
def __init__(self, offline_dir=None):
46+
def __init__(self, offline_dir=None, url=None):
4347
self._offline_dir = offline_dir
48+
if not url:
49+
self._port = self.ES_PORT
50+
self._base_url = self.ES_BASE_URL
51+
self._credentials = self.ES_CREDENTIALS
52+
else:
53+
u = urlparse(url)
54+
self._port = u.port if u.port else self.ES_PORT
55+
self._base_url = "%s://%s:%s" % (u.scheme, u.hostname, self._port)
56+
self._credentials = (u.username, u.password) if u.username else self.ES_CREDENTIALS
57+
print("Using Elasticsearch instance at %s, credentials (%s, %s)" % (self._base_url, self._credentials[0],
58+
"*" * len(self._credentials[1])))
4459

4560
@staticmethod
4661
def elasticsearch_distro_filename(version):
4762
return "%s-%s%s.%s" % (ES_PROJECT, version, ES_ARCH, PACKAGING)
4863

64+
def credentials(self):
65+
return self._credentials
66+
67+
def base_url(self):
68+
return self._base_url
69+
4970
def _latest_build(self, version):
5071
req = requests.get(ARTIF_URL, timeout=self.REQ_TIMEOUT)
5172
vers = req.json()["versions"]
@@ -124,7 +145,7 @@ def _update_es_yaml(self, es_dir):
124145
with open(yaml, mode="a", newline="\n") as f:
125146
f.write("#\n# ODBC Integration Test\n#\n")
126147
f.write("xpack.security.enabled: True\n")
127-
f.write("http.port: %s\n" % self.ES_PORT) # don't bind on next avail port
148+
f.write("http.port: %s\n" % self._port) # don't bind on next avail port
128149
f.write("cluster.routing.allocation.disk.threshold_enabled: False\n")
129150

130151
@staticmethod
@@ -196,22 +217,21 @@ def _enable_xpack(self, es_dir):
196217
# setup passwords to random generated ones first...
197218
pwd = self._gen_passwords(es_dir)
198219
# ...then change passwords, easier to restart with failed tests
199-
req = requests.post("http://localhost:%s/_security/user/_password" % self.ES_PORT, auth=("elastic", pwd),
200-
json={"password": self.AUTH_PASSWORD})
220+
req = requests.post("%s/_security/user/_password" % self._base_url, auth=(self._credentials[0], pwd),
221+
json={"password": self._credentials[1]})
201222
if req.status_code != 200:
202223
raise Exception("attempt to change elastic's password failed with code %s" % req.status_code)
203224
# kibana too (debug convenience)
204-
req = requests.post("http://localhost:%s/_security/user/kibana/_password" % self.ES_PORT,
205-
auth=("elastic", self.AUTH_PASSWORD), json={"password": self.AUTH_PASSWORD})
225+
req = requests.post("%s/_security/user/kibana/_password" % self._base_url, auth=self._credentials,
226+
json={"password": self._credentials[1]})
206227
if req.status_code != 200:
207228
print("ERROR: kibana user password change failed with code: %s" % req.status_code)
208229

209230
# start trial mode
210-
auth = ("elastic", self.AUTH_PASSWORD)
211-
url = "http://localhost:%s/_license/start_trial?acknowledge=true" % self.ES_PORT
231+
url = "%s/_license/start_trial?acknowledge=true" % self._base_url
212232
failures = 0
213233
while True:
214-
req = requests.post(url, auth=auth, timeout=self.REQ_TIMEOUT)
234+
req = requests.post(url, auth=self._credentials, timeout=self.REQ_TIMEOUT)
215235
if req.status_code == 200:
216236
# TODO: check content?
217237
break
@@ -246,7 +266,7 @@ def reset(self, es_dir):
246266
raise Exception()
247267
except:
248268
raise Exception("port %s is active; if Elasticsearch is running it needs to be shut down first" %
249-
self.ES_PORT)
269+
self._port)
250270

251271
data_path = os.path.join(es_dir, "data")
252272
if os.path.isdir(data_path):
@@ -260,24 +280,21 @@ def reset(self, es_dir):
260280
self._start_elasticsearch(es_dir)
261281
self._enable_xpack(es_dir)
262282

263-
@staticmethod
264-
def cluster_name(password=None):
265-
auth = ("elastic", password) if password else None
283+
def cluster_name(self, fail_on_non200=True):
266284
try:
267-
resp = requests.get("http://localhost:%s" % Elasticsearch.ES_PORT, auth=auth, timeout=.5)
285+
resp = requests.get(self._base_url, auth=self._credentials, timeout=self.REQ_TIMEOUT)
268286
except (requests.Timeout, requests.ConnectionError):
269287
return None
270288
if resp.status_code != 200:
271-
if password:
289+
if fail_on_non200:
272290
raise Exception("unexpected ES response code received: %s" % resp.status_code)
273291
else:
274292
return ""
275293
if "cluster_name" not in resp.json():
276294
raise Exception("unexpected ES answer received: %s" % resp.text)
277295
return resp.json().get("cluster_name")
278296

279-
@staticmethod
280-
def is_listening(password=None):
281-
return Elasticsearch.cluster_name(password) is not None
297+
def is_listening(self):
298+
return self.cluster_name(False) is not None
282299

283300
# vim: set noet fenc=utf-8 ff=dos sts=0 sw=4 ts=4 tw=118 :

test/integration/ites.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,17 @@
1010
import argparse
1111
import os, sys, re, time
1212

13-
from elasticsearch import Elasticsearch #spawn_elasticsearch, reset_elasticsearch, es_is_listening, AUTH_PASSWORD
13+
from elasticsearch import Elasticsearch
1414
from data import TestData
1515
from install import Installer
1616
from testing import Testing
1717

1818

1919
def ites(args):
20-
es = Elasticsearch(args.offline_dir)
20+
es = Elasticsearch(args.offline_dir, args.url)
2121

2222
# create a running instance of Elasticsearch if needed
23-
if not args.pre_staged:
23+
if args.url is None:
2424
if args.es_reset:
2525
es_dir = os.path.abspath(args.es_reset)
2626
es.reset(es_dir)
@@ -39,7 +39,7 @@ def ites(args):
3939
"version: %s)" % (args.driver, args.version))
4040

4141
es.spawn(version, root_dir, args.ephemeral)
42-
elif not es.is_listening(Elasticsearch.AUTH_PASSWORD):
42+
elif not es.is_listening():
4343
raise Exception("no running prestaged Elasticsearch instance found.")
4444
else:
4545
print("Using pre-staged Elasticsearch.")
@@ -53,7 +53,7 @@ def ites(args):
5353
else:
5454
test_mode = TestData.MODE_INDEX
5555

56-
data = TestData(test_mode, args.offline_dir)
56+
data = TestData(es, test_mode, args.offline_dir)
5757
data.load()
5858

5959
# install the driver
@@ -65,13 +65,13 @@ def ites(args):
6565
# run the tests
6666
if not args.skip_tests:
6767
assert(data is not None)
68-
cluster_name = es.cluster_name(Elasticsearch.AUTH_PASSWORD)
68+
cluster_name = es.cluster_name()
6969
assert(len(cluster_name))
7070
if args.dsn:
71-
Testing(data, cluster_name, args.dsn).perform()
71+
Testing(es, data, cluster_name, args.dsn).perform()
7272
else:
73-
Testing(data, cluster_name, "Packing=JSON;").perform()
74-
Testing(data, cluster_name, "Packing=CBOR;").perform()
73+
Testing(es, data, cluster_name, "Packing=JSON;").perform()
74+
Testing(es, data, cluster_name, "Packing=CBOR;").perform()
7575

7676
def main():
7777
parser = argparse.ArgumentParser(description='Integration Testing with Elasticsearch.')
@@ -80,8 +80,8 @@ def main():
8080
stage_grp.add_argument("-r", "--root-dir", help="Root directory to [temporarily] stage Elasticsearch into.")
8181
stage_grp.add_argument("-s", "--es-reset", help="Path to an already configured Elasticsearch folder to "
8282
"use; data directory content will be removed; 'ephemeral' will be ignored.")
83-
stage_grp.add_argument("-p", "--pre-staged", help="Use a pre-staged and running Elasticsearch instance",
84-
action="store_true", default=False)
83+
stage_grp.add_argument("-p", "--url", help="Use a pre-staged and running Elasticsearch instance. If no URL is "
84+
"provided, %s is assumed." % Elasticsearch.ES_BASE_URL, nargs="?", const="")
8585

8686
parser.add_argument("-d", "--driver", help="The path to the driver file to test; if not provided, the driver "
8787
"is assumed to have been installed.")
@@ -103,10 +103,10 @@ def main():
103103
"default.")
104104

105105
args = parser.parse_args()
106-
if not (args.root_dir or args.es_reset or args.pre_staged):
106+
if not (args.root_dir or args.es_reset or args.url is not None):
107107
parser.error("no Elasticsearch instance or root/staged directory provided.")
108108

109-
if not (args.driver or args.version or args.es_reset or args.pre_staged):
109+
if not (args.driver or args.version or args.es_reset or args.url is not None):
110110
parser.error("don't know what Elasticsearch version to test against.")
111111

112112
if args.driver and args.dsn and "Driver=" in args.dsn:

test/integration/testing.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,28 +14,32 @@
1414
from elasticsearch import Elasticsearch
1515
from data import TestData, BATTERS_TEMPLATE
1616

17-
UID = "elastic"
18-
CONNECT_STRING = 'Driver={Elasticsearch Driver};UID=%s;PWD=%s;Secure=0;' % (UID, Elasticsearch.AUTH_PASSWORD)
19-
CATALOG = "distribution_run" # source built, "elasticsearch": nightly builds
17+
DRIVER_NAME = "Elasticsearch Driver"
2018

2119
class Testing(unittest.TestCase):
2220

21+
_uid = None
2322
_data = None
2423
_dsn = None
2524
_pyodbc = None
2625
_catalog = None
2726

28-
def __init__(self, test_data, catalog=CATALOG, dsn=None):
27+
def __init__(self, es, test_data, catalog, dsn=None):
2928
super().__init__()
29+
uid, pwd = es.credentials()
30+
31+
self._uid = uid
3032
self._data = test_data
3133
self._catalog = catalog
34+
35+
conn_str = "Driver={%s};UID=%s;PWD=%s;Secure=0;" % (DRIVER_NAME, uid, pwd)
3236
if dsn:
3337
if "Driver=" not in dsn:
34-
self._dsn = CONNECT_STRING + dsn
38+
self._dsn = conn_str + dsn
3539
else:
3640
self._dsn = dsn
3741
else:
38-
self._dsn = CONNECT_STRING
42+
self._dsn = conn_str
3943
print("Using DSN: '%s'." % self._dsn)
4044

4145
# only import pyODBC if running tests (vs. for instance only loading test data in ES)
@@ -324,7 +328,7 @@ def _proto_tests(self):
324328
cnxn.clear_output_converters()
325329

326330
def perform(self):
327-
self._check_info(self._pyodbc.SQL_USER_NAME, UID)
331+
self._check_info(self._pyodbc.SQL_USER_NAME, self._uid)
328332
self._check_info(self._pyodbc.SQL_DATABASE_NAME, self._catalog)
329333

330334
# simulate catalog querying as apps do in ES/GH#40775 do

0 commit comments

Comments
 (0)