package org.apache.spark.examples.streaming import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} object NetworkWordCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: NetworkWordCount ") System.exit(1) } StreamingExamples.setStreamingLogLevels() // Create the context with a 1 second batch size val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(sparkConf, Seconds(1)) // Create a socket stream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') // Note that no duplication in storage level only for running locally. // Replication necessary in distributed scenario for fault tolerance. val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } }