1818import signal
1919import subprocess
2020from subprocess import PIPE
21+ from urllib .parse import urlparse
2122
2223ARTIF_URL = "https://artifacts-api.elastic.co/v1/versions"
2324ES_PROJECT = "elasticsearch"
@@ -32,19 +33,40 @@ class Elasticsearch(object):
3233 TERM_TIMEOUT = 5 # how long to wait for processes to die (before KILLing)
3334 REQ_TIMEOUT = 20 # default GET request timeout
3435 ES_PORT = 9200
36+ ES_BASE_URL = "http://localhost:%s" % ES_PORT
3537 ES_START_TIMEOUT = 60 # how long to wait for Elasticsearch to come online
3638 ES_401_RETRIES = 8 # how many "starting" 401 answers to accept before giving up (.5s waiting inbetween)
3739 AUTH_PASSWORD = "elastic"
40+ ES_CREDENTIALS = ("elastic" , AUTH_PASSWORD ) # user, pwd
3841
3942 _offline_dir = None
43+ _credentials = None
44+ _base_url = None
4045
41- def __init__ (self , offline_dir = None ):
46+ def __init__ (self , offline_dir = None , url = None ):
4247 self ._offline_dir = offline_dir
48+ if not url :
49+ self ._port = self .ES_PORT
50+ self ._base_url = self .ES_BASE_URL
51+ self ._credentials = self .ES_CREDENTIALS
52+ else :
53+ u = urlparse (url )
54+ self ._port = u .port if u .port else self .ES_PORT
55+ self ._base_url = "%s://%s:%s" % (u .scheme , u .hostname , self ._port )
56+ self ._credentials = (u .username , u .password ) if u .username else self .ES_CREDENTIALS
57+ print ("Using Elasticsearch instance at %s, credentials (%s, %s)" % (self ._base_url , self ._credentials [0 ],
58+ "*" * len (self ._credentials [1 ])))
4359
4460 @staticmethod
4561 def elasticsearch_distro_filename (version ):
4662 return "%s-%s.%s" % (ES_PROJECT , version , PACKAGING )
4763
64+ def credentials (self ):
65+ return self ._credentials
66+
67+ def base_url (self ):
68+ return self ._base_url
69+
4870 def _latest_build (self , version ):
4971 req = requests .get (ARTIF_URL , timeout = self .REQ_TIMEOUT )
5072 vers = req .json ()["versions" ]
@@ -123,7 +145,7 @@ def _update_es_yaml(self, es_dir):
123145 with open (yaml , mode = "a" , newline = "\n " ) as f :
124146 f .write ("#\n # ODBC Integration Test\n #\n " )
125147 f .write ("xpack.security.enabled: True\n " )
126- f .write ("http.port: %s\n " % self .ES_PORT ) # don't bind on next avail port
148+ f .write ("http.port: %s\n " % self ._port ) # don't bind on next avail port
127149 f .write ("cluster.routing.allocation.disk.threshold_enabled: False\n " )
128150
129151 @staticmethod
@@ -185,24 +207,33 @@ def _gen_passwords(self, es_dir):
185207 raise Exception ("password generation failed with: %s" % p .stdout .read ())
186208
187209 def _enable_xpack (self , es_dir ):
188- # start trial mode
189- req = requests .post ("http://localhost:%s/_license/start_trial?acknowledge=true" % self .ES_PORT )
190- if req .status_code != 200 :
191- raise Exception ("starting of trial failed with status: %s" % req .status_code )
192- # TODO: check content?
193-
210+ # setup passwords to random generated ones first...
194211 pwd = self ._gen_passwords (es_dir )
195- # change passwords, easier to restart with failed tests
196- req = requests .post ("http://localhost: %s/_security/user/_password" % self .ES_PORT , auth = ("elastic" , pwd ),
197- json = {"password" : self .AUTH_PASSWORD })
212+ # ...then change passwords, easier to restart with failed tests
213+ req = requests .post ("%s/_security/user/_password" % self ._base_url , auth = (self . _credentials [ 0 ] , pwd ),
214+ json = {"password" : self ._credentials [ 1 ] })
198215 if req .status_code != 200 :
199216 raise Exception ("attempt to change elastic's password failed with code %s" % req .status_code )
200217 # kibana too (debug convenience)
201- req = requests .post ("http://localhost: %s/_security/user/kibana/_password" % self .ES_PORT ,
202- auth = ( "elastic" , self . AUTH_PASSWORD ), json = {"password" : self .AUTH_PASSWORD })
218+ req = requests .post ("%s/_security/user/kibana/_password" % self ._base_url , auth = self . _credentials ,
219+ json = {"password" : self ._credentials [ 1 ] })
203220 if req .status_code != 200 :
204221 print ("ERROR: kibana user password change failed with code: %s" % req .status_code )
205222
223+ # start trial mode
224+ url = "%s/_license/start_trial?acknowledge=true" % self ._base_url
225+ failures = 0
226+ while True :
227+ req = requests .post (url , auth = self ._credentials , timeout = self .REQ_TIMEOUT )
228+ if req .status_code == 200 :
229+ # TODO: check content?
230+ break
231+ print ("starting of trial failed (#%s) with status: %s, text: %s" % (failures , req .status_code , req .text ))
232+ failures += 1
233+ if self .ES_401_RETRIES < failures :
234+ raise Exception ("starting of trial failed with status: %s, text: %s" % (req .status_code , req .text ))
235+ time .sleep (.5 )
236+
206237 def spawn (self , version , root_dir = None , ephemeral = False ):
207238 stage_dir = tempfile .mkdtemp (suffix = ".ITES" , dir = root_dir )
208239 if ephemeral :
@@ -227,7 +258,7 @@ def reset(self, es_dir):
227258 raise Exception ()
228259 except :
229260 raise Exception ("port %s is active; if Elasticsearch is running it needs to be shut down first" %
230- self .ES_PORT )
261+ self ._port )
231262
232263 data_path = os .path .join (es_dir , "data" )
233264 if os .path .isdir (data_path ):
@@ -241,18 +272,21 @@ def reset(self, es_dir):
241272 self ._start_elasticsearch (es_dir )
242273 self ._enable_xpack (es_dir )
243274
244- @staticmethod
245- def is_listening (password = None ):
246- auth = ("elastic" , password ) if password else None
275+ def cluster_name (self , fail_on_non200 = True ):
247276 try :
248- req = requests .get ("http://localhost:%s" % Elasticsearch .ES_PORT , auth = auth , timeout = .5 )
249- except requests .Timeout :
250- return False
251- if req .status_code != 200 :
252- raise Exception ("unexpected ES response code received: %s" % req .status_code )
253- if "You Know, for Search" not in req .text :
254- raise Exception ("unexpected ES answer received: %s" % req .text )
255- return True
256-
277+ resp = requests .get (self ._base_url , auth = self ._credentials , timeout = self .REQ_TIMEOUT )
278+ except (requests .Timeout , requests .ConnectionError ):
279+ return None
280+ if resp .status_code != 200 :
281+ if fail_on_non200 :
282+ raise Exception ("unexpected ES response code received: %s" % resp .status_code )
283+ else :
284+ return ""
285+ if "cluster_name" not in resp .json ():
286+ raise Exception ("unexpected ES answer received: %s" % resp .text )
287+ return resp .json ().get ("cluster_name" )
288+
289+ def is_listening (self ):
290+ return self .cluster_name (False ) is not None
257291
258292# vim: set noet fenc=utf-8 ff=dos sts=0 sw=4 ts=4 tw=118 :
0 commit comments