Skip to content
This repository has been archived by the owner on Nov 8, 2018. It is now read-only.

Issues when initializing SparkSession and KafkaUtils.createStream #58

Open
anishsharma opened this issue Feb 27, 2018 · 1 comment
Open

Comments

@anishsharma
Copy link

anishsharma commented Feb 27, 2018

Hi,

I am running your ml_pipeline example and I have following configurations:
Python : 3.6
Keras : 2
Spark : 2
Kafka : 2.11
Java : 8

'D:\spark\bin\spark-submit2.cmd" --conf "spark.app.name' is not recognized as an internal or external command, operable program or batch file. Traceback (most recent call last): File "ml_pipeline.py", line 69, in <module> sc = SparkSession.builder.config(conf=conf).appName(application_name).getOrCreate() File "d:\Anaconda3\lib\site-packages\pyspark\sql\session.py", line 173, in getOrCreate sc = SparkContext.getOrCreate(sparkConf) File "d:\Anaconda3\lib\site-packages\pyspark\context.py", line 334, in getOrCreate SparkContext(conf=conf or SparkConf()) File "d:\Anaconda3\lib\site-packages\pyspark\context.py", line 115, in __init__ SparkContext._ensure_initialized(self, gateway=gateway, conf=conf) File "d:\Anaconda3\lib\site-packages\pyspark\context.py", line 283, in _ensure_initialized SparkContext._gateway = gateway or launch_gateway(conf) File "d:\Anaconda3\lib\site-packages\pyspark\java_gateway.py", line 95, in launch_gateway raise Exception("Java gateway process exited before sending the driver its port number") Exception: Java gateway process exited before sending the driver its port number

application_name = "Distributed Keras Kafka Pipeline
I overcome this error by removing the spaces in title "Distributed Keras Kafka Pipeline". It was strange for me but it did work not sure why ??

Now. I am stuck on this error "AttributeError: 'SparkSession' object has no attribute '_getJavaStorageLevel'". I did some research and found that the default storage level has changed to MEMORY_AND_DISK. So I don't think I have to provide any such value as configuration. I appreciate your help.

EDIT:

I have resolved the above issue by using the relevant spark-streaming-kafka-assembly jar file and it successfully creating the streaming context now. But now I am facing another issue.

On fist iteration as control goes into predict(df) method, I got following errors:

============================PREPARED DATAFRAME=========================== DataFrame[DER_deltaeta_jet_jet: double, DER_deltar_tau_lep: double, DER_lep_eta_centrality: double, DER_mass_MMC: double, DER_mass_jet_jet: double, DER_mass_transverse_met_lep: double, DER_mass_vis: double, DER_met_phi_centrality: double, DER_prodeta_jet_jet: double, DER_pt_h: double, DER_pt_ratio_lep_tau: double, DER_pt_tot: double, DER_sum_pt: double, EventId: double, PRI_jet_all_pt: double, PRI_jet_leading_eta: double, PRI_jet_leading_phi: double, PRI_jet_leading_pt: double, PRI_jet_num: double, PRI_jet_subleading_eta: double, PRI_jet_subleading_phi: double, PRI_jet_subleading_pt: double, PRI_lep_eta: double, PRI_lep_phi: double, PRI_lep_pt: double, PRI_met: double, PRI_met_phi: double, PRI_met_sumet: double, PRI_tau_eta: double, PRI_tau_phi: double, PRI_tau_pt: double, features: vector, features_normalized: vector] 18/03/01 15:04:51 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 18/03/01 15:04:51 WARN BlockManager: Block input-0-1519896890400 replicated to only 0 peer(s) instead of 1 peers 18/03/01 15:04:51 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 18/03/01 15:04:51 WARN BlockManager: Block input-0-1519896890600 replicated to only 0 peer(s) instead of 1 peers 18/03/01 15:04:51 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 18/03/01 15:04:51 WARN BlockManager: Block input-0-1519896890800 replicated to only 0 peer(s) instead of 1 peers 2018-03-01 15:04:51.914911: W c:\l\tensorflow_1501918863922\work\tensorflow-1.2.1\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE instructions, but these are available on your machine and could speed up CPU computations. 18/03/01 15:04:51 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 18/03/01 15:04:51 WARN BlockManager: Block input-0-1519896891600 replicated to only 0 peer(s) instead of 1 peers 2018-03-01 15:04:51.923321: W c:\l\tensorflow_1501918863922\work\tensorflow-1.2.1\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE2 instructions, but these are available on your machine and could speed up CPU computations. 2018-03-01 15:04:51.940857: W c:\l\tensorflow_1501918863922\work\tensorflow-1.2.1\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE3 instructions, but these are available on your machine and could speed up CPU computations. 2018-03-01 15:04:51.949033: W c:\l\tensorflow_1501918863922\work\tensorflow-1.2.1\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations. 2018-03-01 15:04:51.959770: W c:\l\tensorflow_1501918863922\work\tensorflow-1.2.1\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.2 instructions, but these are available on your machine and could speed up CPU computations. 2018-03-01 15:04:51.970355: W c:\l\tensorflow_1501918863922\work\tensorflow-1.2.1\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX instructions, but these are available on your machine and could speed up CPU computations. 2018-03-01 15:04:51.979551: W c:\l\tensorflow_1501918863922\work\tensorflow-1.2.1\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX2 instructions, but these are available on your machine and could speed up CPU computations. 2018-03-01 15:04:51.989807: W c:\l\tensorflow_1501918863922\work\tensorflow-1.2.1\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use FMA instructions, but these are available on your machine and could speed up CPU computations. 18/03/01 15:04:52 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 18/03/01 15:04:52 WARN BlockManager: Block input-0-1519896891800 replicated to only 0 peer(s) instead of 1 peers 18/03/01 15:04:52 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 18/03/01 15:04:52 WARN BlockManager: Block input-0-1519896892000 replicated to only 0 peer(s) instead of 1 peers 18/03/01 15:04:52 ERROR JobScheduler: Error running job streaming job 1519896870000 ms.0 org.apache.spark.SparkException: An exception was raised by Python: Traceback (most recent call last): File "d:\Anaconda3\lib\site-packages\pyspark\streaming\util.py", line 65, in call r = self.func(t, *rdds) File "d:\Anaconda3\lib\site-packages\pyspark\streaming\dstream.py", line 159, in <lambda> func = lambda t, rdd: old_func(rdd) File "ml_pipeline.py", line 132, in process_instances df = predict(df) # Add the raw Neural Network predictions. File "ml_pipeline.py", line 91, in predict predictor = ModelPredictor(keras_model=model, features_col="features_normalized", output_col="prediction") File "d:\dist-keras\distkeras\predictors.py", line 45, in __init__ super(ModelPredictor, self).__init__(keras_model) File "d:\dist-keras\distkeras\predictors.py", line 23, in __init__ self.model = serialize_keras_model(keras_model) File "d:\dist-keras\distkeras\utils.py", line 84, in serialize_keras_model dictionary['weights'] = model.get_weights() File "d:\Anaconda3\lib\site-packages\keras\models.py", line 699, in get_weights return self.model.get_weights() File "d:\Anaconda3\lib\site-packages\keras\engine\topology.py", line 2008, in get_weights return K.batch_get_value(weights) File "d:\Anaconda3\lib\site-packages\keras\backend\tensorflow_backend.py", line 2320, in batch_get_value return get_session().run(ops) File "d:\Anaconda3\lib\site-packages\tensorflow\python\client\session.py", line 789, in run run_metadata_ptr) File "d:\Anaconda3\lib\site-packages\tensorflow\python\client\session.py", line 927, in _run raise RuntimeError('The Session graph is empty. Add operations to the ' RuntimeError: The Session graph is empty. Add operations to the graph before calling run().

Any help would be appreciated. Am I missing anything here ?

Thanks & Regards
Anish Sharma

@S-shweta
Copy link

S-shweta commented Mar 8, 2018

Hi,

I am facing the same error, any hints to the resolutions?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants