Forecasting values across categories using Prophet and Python
I'm trying to use Prophet to forecast multiple categories:
def get_prediction(df):
prediction = {}
pdf = df.toPandas()
partner_codes = pdf.partner_code.unique()
for partner_code in partner_codes:
partner_code_df = pdf.loc[pdf['partner_code'] == partner_code]
# set the uncertainty interval to 95% (the Prophet default is 80%)
my_model = Prophet(seasonality_mode = 'additive')
my_model.fit(partner_code_df)
future_dates = my_model.make_future_dataframe(periods=12, freq='M')
forecast = my_model.predict(future_dates)
prediction[partner_code] = forecast
return prediction
That piece of code seems to run just fine. But I'm having a hard time finding an output for it:
forecast = get_prediction(df)
fcst = pd.Series(forecast).to_frame()
print_fcst = spark.createDataFrame(fcst)
display(print_fcst)
This doesn't work as the output of get_prediction(df) is a dict.
Any help would be greatly appreciated!
Thanks.
See also questions close to this topic
-
how to display contents of text file one line at a time via timer using python on windows?
this is the code.
def wndProc(hWnd, message, wParam, lParam): if message == win32con.WM_PAINT: hdc, paintStruct = win32gui.BeginPaint(hWnd) dpiScale = win32ui.GetDeviceCaps(hdc, win32con.LOGPIXELSX) / 60.0 fontSize = 36 # http://msdn.microsoft.com/en-us/library/windows/desktop/dd145037(v=vs.85).aspx lf = win32gui.LOGFONT() lf.lfFaceName = "Times New Roman" lf.lfHeight = int(round(dpiScale * fontSize)) #lf.lfWeight = 150 # Use nonantialiased to remove the white edges around the text. # lf.lfQuality = win32con.NONANTIALIASED_QUALITY hf = win32gui.CreateFontIndirect(lf) win32gui.SelectObject(hdc, hf) rect = win32gui.GetClientRect(hWnd) # http://msdn.microsoft.com/en-us/library/windows/desktop/dd162498(v=vs.85).aspx win32gui.DrawText( hdc, **'Glory be to the Father, and to the son and to the Holy Spirit.',** -1, rect, win32con.DT_CENTER | win32con.DT_NOCLIP | win32con.DT_VCENTER ) win32gui.EndPaint(hWnd, paintStruct) return 0
.where it says the "glory be to the father.." prayer I would like that string to actually display a few different prayers on a timer. what I mean is I want to save short prayers to a text file and have the line where it says "glory be.." to change to a new prayer every 60 seconds cycling through a few prayers such as the serenity prayer etc.
-
How to plot the frequency of my data per day in an histogram?
I want to plot the number occurences of my data per day. y represent the id of my data. x represent the timestamp which I convert to time and day. But I can't make the correct plot. import matplotlib.pyplot as plt plt.style.use('ggplot') import time
y=['5914cce8-fad6-45d1-bec2-e59e62823617', '1c2067e0-5173-4a1d-8a75-b18267ee4598', 'db6830ff-fa9c-4aa5-b71e-f6da9333f357', '672cc9d5-360e-4451-bb7c-03e3d0bd8f0d', 'fb0f8122-fffc-47fe-a87a-b2b749df173b', '558e96ca-0222-40c7-acc0-e444f7663f53', 'c3f86fd5-eac3-48d3-a44c-b325f30b6139', '21dd849f-895f-4cf5-a168-45a4c1a9fbf9', 'e3b4cd56-e291-4671-93b6-d2226ee82ae7', '01346c48-a8c4-43d1-ac02-1efa33ca0f4e', '23b78b0f-85be-4ca7-99f4-1a5add76c12e', 'b1c036c0-0c2b-4170-a170-8fd0add0dec2', '74737546-e9c3-4126-bcb2-4d34503421ca', '342991f5-ec87-4c9d-83eb-9908f3e221aa', '4fdcd83a-eb68-4e26-b79b-753c5e022a4e', 'b7fbeca9-9416-43c4-9e90-9e71acc1eaba', '27c9d358-a3ef-4c69-ba89-eac16d8d3bdb', 'ef982c4b-a115-48a1-aef1-2f672d7f1f00', 'efedede2-9bb4-4c52-98b1-8b03070df3fd', 'eb03ae1b-4cde-409c-8d34-2a16a8be30d2'] x=['1548143296750', '1548183033872', '1548346185194', '1548443373507', '1548446119319', '1548446239441', '1548446068267', '1548445962159', '1548446011209', '1548446259465', '1548446180380', '1548239985290', '1548240060367', '1548240045347', '1547627568993', '1548755333313', '1548673604016','1548673443843', '1548673503914', '1548673563975'] date=[] for i in x: print(i) print() i=i[:10] print(i) readable = time.ctime(int(i)) readable=readable[:10] date.append(readable) print(date) plt.hist(date,y) plt.show()
-
mysql.connector.errors.ProgrammingError: Error in SQL Syntax
I'm using the Python MySQL connector to add data to a table by updating the row. A user enters a serial number, and then the row with the serial number is added. I keep getting a SQL syntax error and I can't figure out what it is.
query = ("UPDATE `items` SET salesInfo = %s, shippingDate = %s, warrantyExpiration = %s, item = %s, WHERE serialNum = %s") cursor.execute(query, (info, shipDate, warranty, name, sn, )) conn.commit()
Error:
mysql.connector.errors.ProgrammingError: 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'WHERE serialNum = '1B0000021A974726'' at line 1
"1B0000021A974726" is a serial number inputted by the user and it is already present in the table.
-
Spark 2.2 extends AccumulatorV2 to support <key, value> update
Problems when extends AccumulatorV2 in Spark 2.2.
My spark program may have many counters, and I don't want to create and register them everywhere when I want to count something. So the idea here is to create a customized counter, whose behavior is like a map. In this way, when I want to count something in my program, I don't need to create and register my accumulator, all I need is to directly call an
update
function, just like how we would count the frequency of word with a hashmap.There's a very similar idea in this post, but when I want to implement the this logic in Spark 2.2's new Accumulator, seems like the accumulator is not updating.
Here's my customized accumulator:
public class MapAccumulator extends AccumulatorV2<Map<String, Long>, Map<String, Long>> { private Map<String, Long> counters; public MapAccumulator() { counters = new ConcurrentHashMap<>(); } public MapAccumulator(Map<String, Long> map) { counters = new ConcurrentHashMap<>(map); } @Override public boolean isZero() { return counters.isEmpty(); } @Override public AccumulatorV2<Map<String, Long>, Map<String, Long>> copy() { return new MapAccumulator(this.counters); } @Override public void reset() { counters.clear(); } @Override public void add(Map<String, Long> map) { for (Map.Entry<String, Long> entry : map.entrySet()) { String key = entry.getKey(); Long value = entry.getValue(); counters.put(key, counters.getOrDefault(key, 0L) + value); } } @Override public void merge(AccumulatorV2<Map<String, Long>, Map<String, Long>> other) { add(other.value()); } @Override public Map<String, Long> value() { return counters; } }
And here's my spark program:
public class SparkBaseExp implements Serializable { private final transient SparkConf sparkConf; private final transient JavaSparkContext jsc; private final AccumulatorV2<Map<String, Long>, Map<String, Long>> counterV2; private static SparkConf createSparkConf() { SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("spark.exp"); sparkConf.set("spark.master", "local[4]"); sparkConf.set("spark.executor.memory", "1g"); return sparkConf; } public SparkBaseExp() { sparkConf = createSparkConf(); jsc = new JavaSparkContext(sparkConf); counterV2 = new MapAccumulator(); jsc.sc().register(counterV2, "MapAccumulator"); } private void run() throws InterruptedException { String fileName = "myfile.txt"; JavaRDD<String> rdd = jsc.textFile(fileName); JavaRDD<Integer> lenRdd = rdd.map(new Function<String, Integer>() { @Override public Integer call(String v) { update("line", 1); if (v.contains("ip")) { update("ip", 1); } return v.length(); } }); List<Integer> list = lenRdd.collect(); System.out.println("line count: " + getCounterValue("line")); System.out.println("Value of ip count: " + getCounterValue("ip")); } private void update(String key, long value) { Map<String, Long> temp = new HashMap<>(1); temp.put(key, value); counterV2.add(temp); } private long getCounterValue(String key) { if (key == null || !counterV2.value().containsKey(key)) { return 0; } return counterV2.value().get(key); } private void stop() { jsc.stop(); } public static void main(String[] args) throws Exception { SparkBaseExp spark = new SparkBaseExp(); spark.run(); // Sleep for 5 minutes to review the overall performance Thread.sleep(5 * 60 * 1000L); spark.stop(); } }
I would expect the output of count
line
andip
are both something no-zero, but turns out the output are all zero. I'm sure that the file content has been successfully loaded, very confused about why the counter is not updated.Is my way of extending the
AccumulatorV2
wrong? Any suggestion is appreciated, thanks in advance! -
StreamingKMeans setSeed()
I need to train StreamingKMeans with a specific value for seed. When I run
val km = new StreamingKMeans(3, 1.0, "points") km.setRandomCenters(10, 0.5) val newmodel = km.latestModel.update(featureVectors, 1.0, "points") val prediction3 = id_features.map(x=> (x._1, newmodel.predict(x._2)))
it works fine. But when I am trining to use sedSeed:
km.setRandomCenters(10, 0.5).setSeed(6250L)
I am getting an error:
value setSeed is not a member of org.apache.spark.mllib.clustering.StreamingKMeans
How can I set the seed in this case?
-
Spark credential chain ordering - S3 Exception Forbidden
I'm running Spark 2.4 on an EC2 instance. I am assuming an IAM role and setting the key/secret key/token in the sparkSession.sparkContext.hadoopConfiguration, along with the credentials provider as "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider".
When I try to read a dataset from s3 (using s3a, which is also set in the hadoop config), I get an error that says
com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: 7376FE009AD36330, AWS Error Code: null, AWS Error Message: Forbidden
read command:
val myData = sparkSession.read.parquet("s3a://myBucket/myKey")
I've repeatedly checked the S3 path and it's correct. My assumed IAM role has the right privileges on the S3 bucket. The only thing I can figure at this point is that spark has some sort of hidden credential chain ordering and even though I have set the credentials in the hadoop config, it is still grabbing credentials from somewhere else (my instance profile???). But I have no way to diagnose that.
Any help is appreciated. Happy to provide any more details.
-
How to sample groups in pyspark?
I have a dataframe with about > 1M groups and each group contains about ~100 records (rows). How do I sample based on the distinct groups in pyspark so that the selected groups will still have the complete rows?
A much smaller example:
+-----+---+ |group| x | +-----+---+ | 1 |0.1| | 1 |0.2| | 2 |0.1| | 2 |0.5| | 2 |0.3| | 3 |0.5| | 4 |0.8| | 4 |0.5| +-----+---+
I want to sample so that if group 1 and 3 are selected I got complete records from them:
+-----+---+ |group| x | +-----+---+ | 1 |0.1| | 1 |0.2| | 3 |0.5| +-----+---+
-
How do I create a new column for my dataframe whose values are maps made up of values from different columns?
I've seen similar questions but haven't been able to find exactly what I need and have been struggling to figure out if I can manage to do what I want without using a UDF.
Say I start with this dataframe:
+---+---+---+ | pk| a| b| +---+---+---+ | 1| 2| 1| | 2| 4| 2| +---+---+---+
I want the resulting dataframe to look like
+----------------+---+ | ab| pk| +----------------+---+ |[A -> 2, B -> 1]| 1| |[A -> 4, B -> 2]| 2| +----------------+---+
Where
A
andB
are names that correspond toa
andb
(I guess I can fix this with an alias, but currently now I'm using a UDF that returns a map of{'A': column a value, 'B': column b value}
)Is there any way to accomplish this using create_map or otherwise without a UDF?
-
Query Vertica between dates from PySpark
I have Spark 1.6 running over Python 3.4, retrieve data from my Vertica database to work on it the below query,Spark DataFrames support predicate push-down with JDBC sources but term predicate is used in a strict SQL meaning. It means it covers only WHERE clause. Moreover it looks like it is limited to the logical conjunction (no IN and OR I am afraid) and simple predicates, it shows this error : java.lang.RuntimeException: Option 'dbtable' not specified
the DB contains massive data around 100 billions and I can't retrieve the data and spark1.6 doesn't allow me to use query only dbtable as schema.table, and I got the below error :
java.lang.RuntimeException: Option 'dbtable' not specified
sc = SparkContext(conf=conf) sqlContext = SQLContext(sc) url = "*******" properties = {"user": "*****", "password": "*******", "driver": "com.vertica.jdbc.Driver" } df = sqlContext.read.format("JDBC").options( url = url, query = "SELECT date(time_stamp) AS DATE, (subscriber) AS IMSI, (server_hostname) AS WEBSITE, (bytes_in) AS DOWNLINK, (bytes_out) AS UPLINK,(connections_out) AS CONNECTION FROM traffic.stats WHERE DATE(time_stamp) between '2019-01-25' AND '2019-01-29'", **properties ).load() df.show()
I have tried the below query with no result it takes long time without using limit function
query = "SELECT date(time_stamp) AS DATE, (subscriber) AS IMSI, (server_hostname) AS WEBSITE, (bytes_in) AS DOWNLINK, (bytes_out) AS UPLINK,(connections_out) AS CONNECTION FROM traffic.stats WHERE date(time_stamp) between '2019-01-27' AND '2019-01-29'" df = sqlContext.read.format("JDBC").options( url = url, dbtable="( " + query + " ) as temp", **properties ).load()
is it there anyway to read the data as above or read it as dataframe with specific query ?
I have tried to reduce the time by set more conditions and limitation , but its refused on $\conditions , even if remove the conditions its gives me "Subquery in FROM must have an alias" , this is the query:
SELECT min(date(time_stamp)) AS mindate,max(date(time_stamp)) AS maxdate,count (distinct date(time_stamp)) AS noofdays, (subscriber) AS IMSI, (server_hostname) AS WEBSITE, sum(bytes_in) AS DL, sum(bytes_out) AS UL, sum(connections_out) AS conn from traffic.stats where SUBSCRIBER like '41601%' and date(time_stamp) between '2019-01-25' and '2019-01-29'and signature_service_category = 'Web Browsing' and (signature_service_name = 'SSL v3' or signature_service_name = 'HTTP2 over TLS') and server_hostname not like '%.googleapis.%' and server_hostname not like '%.google.%' and server_hostname <> 'doubleclick.net' and server_hostname <> 'youtube.com' and server_hostname <> 'googleadservices.com' and server_hostname <> 'app-measurement.com' and server_hostname <> 'gstatic.com' and server_hostname <> 'googlesyndication.com' and server_hostname <> 'google-analytics.com' and server_hostname <> 'googleusercontent.com' and server_hostname <> 'ggpht.com' and server_hostname <> 'googletagmanager.com' and server_hostname is not null group by subscriber, server_hostname
-
Stream data into Azure Databricks using Event Hubs
I want to send messages from a Twitter application to an Azure event hub. However, I am getting the an error that says instead of
java.util.concurrent.ExecutorService
usejava.util.concurrent.ScheduledExecutorService
.I do not know how to create the EventHubClient.create now. Please help.
I am referring to code from the link
https://docs.microsoft.com/en-us/azure/azure-databricks/databricks-stream-from-eventhubs
This is error I am getting:
notebook:15: error: type mismatch; found :
java.util.concurrent.ExecutorService
required:java.util.concurrent.ScheduledExecutorService
val pool = `Executors.newFixedThreadPool(1)` val eventHubClient = EventHubClient.create(connStr.toString(), pool)
Here is my code,
import java.util._ import scala.collection.JavaConverters._ import com.microsoft.azure.eventhubs._ import java.util.concurrent.{Executors, ExecutorService} val pool: ExecutorService = Executors.newFixedThreadPool(1) val eventHubClient = EventHubClient.create(connStr.toString(), pool)
-
databricks with python can't use fs module AttributeError: module 'dbutils' has no attribute 'fs'
I am using azure databricks for the first time to read some files and trying to use python with dbutils.fs.ls("/mnt")
But I get an error saying that dbutils doesn't have fs module. I was reading and say that all databricks come with dbutils already.
AttributeError: module 'dbutils' has no attribute 'fs'
if i do
print(dir(dbutils))
['Console', 'DBUtils', 'FileInfo', 'Iterable', 'ListConverter', 'MapConverter', 'MountInfo', 'NotebookExit', 'Py4JJavaError', 'SecretMetadata', 'SecretScope', 'WidgetsHandlerImpl', 'builtins', 'cached', 'doc', 'file', 'loader', 'name', 'package', 'spec', 'absolute_import', 'makeTensorboardManager', 'namedtuple', 'print_function', 'range', 'stderr', 'stdout', 'string_types', 'sys', 'zip']
i found that it suppose to have the library already install https://docs.databricks.com/user-guide/dev-tools/dbutils.html#dbutils
Is there a magic trick? I wanted to check if I have a file is mounted if not mount it and unmount it.
-
Reference uploaded JAR library
I've billed set of support function into
helper.jar
library and imported to Databricks cluster. The jar is installed on the cluster, but I'm not able to reference the functions in the library.The jar import has been tested, cluster restarted and the jar can be referenced in InelliJ where it was developed.
//next line generates error value helper is not a member of org.apache.spark.sql.SparkSession import helper //nex line generates error: not found: value fn_conversion display(df.withColumn("RevenueConstantUSD", fn_conversion($"Revenue"))
I'd expect the
helper
function would be visible after library deployment or possibly after adding theimport
command. -
Error: package or namespace load failed for ‘prophet’
I have installed Rtools(rtools35.exe) and also installed
rstan
before installingprophet
but it gives me error everytime like this> library(prophet)
Error: package or namespace load failed for ‘prophet’: .onLoad failed in loadNamespace() for 'prophet', details: call: system2(file.path(Sys.getenv("R_HOME"), "bin", "R"), args = "CMD SHLIB --dry-run", error: 'CreateProcess' failed to run 'C:\PROGRA~1\R\R-35~1.2\bin\R.exe CMD SHLIB --dry-run'
My Rtools is installed in C:\Rtools whereas my R3.5.2 version is installed in C:\Program Files\R\R3.5.2
Does it have to be with the path of installation?
-
Using prophet package in shiny dashboard?
I am working with shiny (shiny dashboard) and also doing a forecast of my dataset through the prophet package. Shiny is working perfect, and my forecast too. Now I would like to insert the code into shiny so to plot the forecast plot inside shiny. That's not working the output on the shiny app ist just white and not showing any output.
server code:
output$fcCrime <- renderPlot({ Year12_16 <- cc[cc$Year2 %in% c("2012", "2013", "2014", "2015", "2016"),] df <- Year12_16 %>% group_by(Date2) %>% summarise(y = n()) %>% mutate(y = log(y)) names(df) <- c("ds", "y") df$ds <- factor(df$ds) m <- prophet(df) future <- make_future_dataframe(m, 365*4) forecast <- predict(m, future) tail(forecast[c('ds', 'yhat', 'yhat_lower', 'yhat_upper')]) #plot(m, forecast, xlabel = "Year", ylabel = "Data")+ggtitle("Forecast of Crimes in Chicago") dyplot.prophet(m, forecast) #prophet_plot_components(m, forecast) })
UPDATE: Working with this dataset: https://www.kaggle.com/currie32/crimes-in-chicago
Also updates the server code. The thing is without shiny everything is plotting and working fine, but inside the shiny app its not plotting anything on the shiny dashboard.
-
plot_components in python fbprophet returns duplicate plots
I am using prophet in python for time series forecasting. After fitting the model, when I use plot_components(forecast) to plot the seasonal components, I get two copies of each plot: 2 copies of trend, daily seasonability, etc. Any idea why?