diff --git a/projects/liza22/pom.xml b/projects/liza22/pom.xml new file mode 100644 index 0000000..29abee2 --- /dev/null +++ b/projects/liza22/pom.xml @@ -0,0 +1,40 @@ + + + 4.0.0 + + ru.mipt.diht.students + parent + 1.0-SNAPSHOT + + ru.mipt.diht.students + liza22 + 1.0-SNAPSHOT + liza22 + http://maven.apache.org + + UTF-8 + + + + junit + junit + 3.8.1 + test + + + org.twitter4j + twitter4j-core + [4.0,) + + + org.twitter4j + twitter4j-stream + [4.0,) + + + com.beust + jcommander + 1.48 + + diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/App.java b/projects/liza22/src/main/java/ru/mipt/diht/students/App.java new file mode 100644 index 0000000..31d6fa9 --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/App.java @@ -0,0 +1,11 @@ +package ru.mipt.diht.students; + +/** + * Hello world! + * + */ +public class App { + public static void main(String[] args) { + System.out.println("Hello World!"); + } +} diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/.gitignore b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/.gitignore new file mode 100644 index 0000000..031401d --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/.gitignore @@ -0,0 +1 @@ +twitter.cfg \ No newline at end of file diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/pom.xml b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/pom.xml new file mode 100644 index 0000000..d3f7776 --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/pom.xml @@ -0,0 +1,75 @@ + + + 4.0.0 + + com.twitter.stream + twitterstream + 1.0 + + + 1.7 + 1.7 + + 1.48 + 4.0.4 + + + + + com.beust + jcommander + ${jcommander-version} + + + org.twitter4j + twitter4j-core + ${twitter4j-version} + + + org.twitter4j + twitter4j-stream + ${twitter4j-version} + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + TwitterStream + lib/*d + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + 2.5.1 + + + copy-dependencies + package + + copy-dependencies + + + runtime + ${project.build.directory}/lib/ + + + + + + + + + \ No newline at end of file diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/TwitterStream.java b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/TwitterStream.java new file mode 100644 index 0000000..01aaba5 --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/TwitterStream.java @@ -0,0 +1,85 @@ +import com.beust.jcommander.JCommander; +import config.Arguments; +import config.Constants; +import config.TwitterConfig; +import core.handling.TweetHandler; +import core.handling.TweetHandlerFactory; +import core.providing.TweetsProvider; +import core.providing.TweetsProviderFactory; +import model.Mode; + +import java.io.*; + +/** + * Main Class. + */ + +public class TwitterStream { + + public static final int LINE_LENGTH = 1024; + + public static void main(final String[] argsString) { + extractArguments(argsString); + Arguments arguments = Arguments.getInstance(); + + /* In case of HELP page is requested, + * just print HELP file content and exit application + */ + if (arguments.isHelpRequest()) { + printHelp(System.out); + System.exit(0); + } + try { + // read and initialize twitter configuration + TwitterConfig twitterConfig = new TwitterConfig(); + // define working mode of application + Mode workingMode; + if (arguments.isStreamMode()) { + workingMode = Mode.STREAM; + } else { + workingMode = Mode.QUERY; + } + // get tweets provider for selected working mode and initialize this one + TweetsProvider tweetsProvider = TweetsProviderFactory.getProvider(workingMode); + tweetsProvider.init(twitterConfig); + // get tweet handler for selected working mode + TweetHandler tweetHandler = TweetHandlerFactory.getHandler(workingMode); + // start tweets providing and handling + tweetsProvider.provide(tweetHandler); + } catch (Exception e) { + System.err.println("Application TwitterStream has been occasionally crashed " + + "with error = \"" + e.getMessage() + "\""); + System.err.println("Application will be terminated with error code."); + System.exit(1); + } + } + + /** + * Transforms arguments string to Arguments object with parsed fields. + * @param argsString arguments strings from the input + */ + private static void extractArguments(final String[] argsString) { + JCommander jCommander = new JCommander(); + jCommander.addObject(Arguments.getInstance()); + jCommander.parse(argsString); + } + + /** + * Prints the content of HELP file to the passed output stream. + * @param out stream where HELP page will be printed + */ + private static void printHelp(OutputStream out) { + try { + byte[] buffer = new byte[LINE_LENGTH]; + try (InputStream input = TwitterStream.class.getClassLoader().getResourceAsStream(Constants.HELP_FILE)) { + int length = input.read(buffer); + while (length != -1) { + out.write(buffer, 0, length); + length = input.read(buffer); + } + } + } catch (IOException e) { + System.err.println("Problem with reading help file: \"" + e.getMessage() + "\""); + } + } +} diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/config/Arguments.java b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/config/Arguments.java new file mode 100644 index 0000000..461846a --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/config/Arguments.java @@ -0,0 +1,107 @@ +package config; + +import com.beust.jcommander.Parameter; + +import java.util.List; + +/** + * Arguments storage. + * Used JCommander library. + * + * @see http://jcommander.org/ + */ +public final class Arguments { + private static final Arguments INSTANCE = new Arguments(); + + private Arguments() { } + + public static Arguments getInstance() { + return INSTANCE; + } + + @Parameter(names = {"--query", "-q"}, + required = true, + description = "Query or keywords for stream") + private List keywords; + + @Parameter(names = {"--place", "-p"}, + required = true, + description = "Location for search tweets") + private String place; + + @Parameter(names = {"--stream", "-s"}, + description = "Stream mode when tweets printed with delay") + private boolean streamMode; + + @Parameter(names = "--hideRetweets", + description = "Hides retweets at all") + private boolean hideRetweets; + + @Parameter(names = {"--limit", "-l"}, + description = "Limits the number of printed tweets") + private Integer limitOfTweets = Constants.NO_TWEETS_LIMIT; + + @Parameter(names = {"--help", "-h"}, + help = true, + description = "Requests the help page") + private boolean helpRequest; + + @Parameter(names = {"--verbose", "-v"}, + description = "Verbose mode to print more information") + private boolean verbose; + + /** + * Array of keywords in case of tweets stream requested. + * @return array of keywords to be tracked + */ + public String[] getKeywords() { + String[] keywordsArray = new String[keywords.size()]; + return keywords.toArray(keywordsArray); + } + + /** + * Suppose that query for Search tweets is element with 0 index. + * @return search tweets query + */ + public String getQuery() { + return keywords.get(0); + } + + public String getPlace() { + return place; + } + + public boolean isStreamMode() { + return streamMode; + } + + public boolean hideRetweets() { + return hideRetweets; + } + + public Integer getLimitOfTweets() { + return limitOfTweets; + } + + public boolean isHelpRequest() { + return helpRequest; + } + + public boolean isVerboseMode() { + return verbose; + } + + @Override + public String toString() { + return "Arguments{" + + "keywords=" + keywords + + ", place='" + place + + '\'' + + ", streamMode=" + streamMode + + ", hideRetweets=" + hideRetweets + + ", limitOfTweets=" + limitOfTweets + + ", helpRequest=" + helpRequest + + ", verboseMode=" + verbose + + '}'; + } +} diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/config/Constants.java b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/config/Constants.java new file mode 100644 index 0000000..b9945af --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/config/Constants.java @@ -0,0 +1,31 @@ +package config; + +/** + * Application constants storage. + */ +public final class Constants { + /** + * Reconnect timeout to twitter in seconds . + */ + public static final int RECONNECT_TIMEOUT_SECS = 10; + /** + * Default value of limit argument. + */ + public static final int NO_TWEETS_LIMIT = -1; + /** + * Delay between two printing of tweets. + */ + public static final int PRINT_TWEET_DELAY_SECS = 1; + /** + * Message which is printed in verbose mode when no any tweet to print for stream. + */ + public static final String NO_TWEET_MESSAGE = "..."; + /** + * Name of resource - twitter config file. + */ + public static final String TWITTER_CONFIG_FILE = "twitter.cfg"; + /** + * Name of resource - help content file. + */ + public static final String HELP_FILE = "help.txt"; +} diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/config/TwitterConfig.java b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/config/TwitterConfig.java new file mode 100644 index 0000000..9cee397 --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/config/TwitterConfig.java @@ -0,0 +1,101 @@ +package config; + +import twitter4j.auth.AccessToken; +import twitter4j.conf.Configuration; +import twitter4j.conf.ConfigurationBuilder; + +import java.io.*; + +/* + * Class loads and holds Twitter Access configuration from file resource + * and provides methods to get {@link twitter4j.conf.Configuration} and {@link twitter4j.auth.AccessToken} + */ +public class TwitterConfig { + private static final String CONSUMER_KEY_PROP_NAME = "consumerKey"; + private static final String CONSUMER_SECRET_PROP_NAME = "consumerSecret"; + private static final String ACCESS_TOKEN_PROP_NAME = "accessToken"; + private static final String ACCESS_TOKEN_SECRET_PROP_NAME = "accessTokenSecret"; + + private String consumerKey; + private String consumerSecret; + private String accessToken; + private String accessTokenSecret; + + public TwitterConfig() { + init(); + } + + private void init() { + File cfgFile = new File(Constants.TWITTER_CONFIG_FILE); + try (BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(cfgFile)))) { + while (in.ready()) { + String line = in.readLine(); + int indexOfDelimiter = line.indexOf('='); + if (indexOfDelimiter == -1) { + System.err.println("Incorrect line in twitter configuration = '" + line + "'"); + continue; + } + String propName = line.substring(0, indexOfDelimiter); + String propValue = line.substring(indexOfDelimiter + 1, line.length()); + switch (propName) { + case CONSUMER_KEY_PROP_NAME: + consumerKey = propValue; + break; + case CONSUMER_SECRET_PROP_NAME: + consumerSecret = propValue; + break; + case ACCESS_TOKEN_PROP_NAME: + accessToken = propValue; + break; + case ACCESS_TOKEN_SECRET_PROP_NAME: + accessTokenSecret = propValue; + break; + default: + System.err.println("Property '" + propName + "' not recognized"); + } + } + } catch (FileNotFoundException e) { + System.err.println("Twitter config file by path = \"" + cfgFile.getAbsolutePath() + "\" not found"); + } catch (IOException e) { + System.err.println("Problem with reading twitter config file: " + e.getMessage()); + } + + validate(); + } + + private void validate() { + if (consumerKey == null + || consumerSecret == null + || accessToken == null + || accessTokenSecret == null) { + throw new IllegalStateException("Twitter configuration file is incorrect"); + } + } + + public final AccessToken getAccessToken() { + return new AccessToken(accessToken, accessTokenSecret); + } + + public final Configuration getConfiguration() { + ConfigurationBuilder configurationBuilder = new ConfigurationBuilder(). + setOAuthConsumerKey(consumerKey). + setOAuthConsumerSecret(consumerSecret). + setOAuthAccessToken(accessToken). + setOAuthAccessTokenSecret(accessTokenSecret); + return configurationBuilder.build(); + } + + @Override + public final String toString() { + return "TwitterConfig{" + + "consumerKey='" + consumerKey + + '\'' + + ", consumerSecret='" + consumerSecret + + '\'' + + ", accessToken='" + accessToken + + '\'' + + ", accessTokenSecret='" + accessTokenSecret + + '\'' + + '}'; + } +} diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/handling/TweetHandler.java b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/handling/TweetHandler.java new file mode 100644 index 0000000..01d81fc --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/handling/TweetHandler.java @@ -0,0 +1,15 @@ +package core.handling; + +import model.Tweet; + +/** + * Tweet handler interface. + */ +public interface TweetHandler { + + /** + * Handles tweet by any way. + * @param tweet obtained tweet + */ + void handle(Tweet tweet); +} diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/handling/TweetHandlerFactory.java b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/handling/TweetHandlerFactory.java new file mode 100644 index 0000000..807bc25 --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/handling/TweetHandlerFactory.java @@ -0,0 +1,26 @@ +package core.handling; + +import core.handling.impl.PrintResultOfQueryTweetsHandler; +import core.handling.impl.PrintStreamOfTweetsHandler; +import model.Mode; + +public final class TweetHandlerFactory { + + private TweetHandlerFactory() { } + + /** + * Gets tweets handler depending on the working mode. + * @param mode working mode of application + * @return tweet handler implementation + */ + public static TweetHandler getHandler(Mode mode) { + switch (mode) { + case STREAM: + return new PrintStreamOfTweetsHandler(System.out); + case QUERY: + return new PrintResultOfQueryTweetsHandler(System.out); + default: + throw new IllegalArgumentException("Mode = " + mode + " is not supported"); + } + } +} diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/handling/impl/PrintResultOfQueryTweetsHandler.java b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/handling/impl/PrintResultOfQueryTweetsHandler.java new file mode 100644 index 0000000..b244d2e --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/handling/impl/PrintResultOfQueryTweetsHandler.java @@ -0,0 +1,105 @@ +package core.handling.impl; + +import core.handling.TweetHandler; +import model.Tweet; +import utils.TextUtils; + +import java.io.PrintStream; +import java.util.Date; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * This handler is used in QUERY mode of application. + * It prints tweets every time when new tweet is come. + * + * When new tweet is obtained (method 'handle' invoked) this handler prints + * formatted representation of this new tweet. + */ +public final class PrintResultOfQueryTweetsHandler implements TweetHandler { + private static final int MILLI = 1_000; + private static final int SEC_IN_MIN = 60; + private static final int STRING_SIZE = 256; + + private PrintStream out; + + private AtomicLong tweetCounter = new AtomicLong(0); + + public PrintResultOfQueryTweetsHandler(PrintStream outStream) { + this.out = outStream; + } + + @Override + public void handle(Tweet tweet) { + out.println("Tweet#" + tweetCounter.incrementAndGet() + ":"); + out.println(formatTweet(tweet)); + } + + /* + * Print format is the following: + * + * If tweet IS NOT retweeted + * ---------------------------------------------------------------------------------------- + * [] @: + * ---------------------------------------------------------------------------------------- + * + * If tweet IS retweeted + * ---------------------------------------------------------------------------------------- + * [] @: ретвитнул @: ( ретвитов) + * ---------------------------------------------------------------------------------------- + * + * @param tweet tweet object to be printed + * @return text representation of tweet according to format + */ + private static String formatTweet(Tweet tweet) { + StringBuilder tweetView = new StringBuilder(STRING_SIZE); + tweetView.append("----------------------------------------------------------------------------------------\n"); + if (tweet.isNotRetweet()) { + tweetView.append("[").append(formatTime(tweet.getTime())).append("] "). + append("@").append(getNickname(tweet)). + append(": ").append(tweet.getText()); + } else { + Tweet retweetedTweet = tweet.getRetweetedTweet(); + tweetView.append("[").append(formatTime(tweet.getTime())).append("] "). + append("@").append(getNickname(tweet)). + append(": ретвитнул @").append(getNickname(retweetedTweet)). + append(": ").append(retweetedTweet.getText()). + append(" (").append(retweetedTweet.getRetweetCount()).append(" ретвитов)"); + } + tweetView.append("\n----------------------------------------------------------------------------------------"); + return tweetView.toString(); + } + + private static String getNickname(Tweet tweet) { + String nick = tweet.getAuthor().getName(); + return TextUtils.coloredText(nick, TextUtils.COLOR_BLUE); + } + + /* + * Time format is the following: + * Время должно быть в формате: + * "Только что" - если менее 2х минут назад + * "n минут назад" - если менее часа назад (n - цифрами) + * "n часов назад" - если более часа, но сегодня (n - цифрами) + * "вчера" - если вчера + * "n дней назад" - в остальных случаях (n - цифрами) + * + * @param then the tweet's time in milliseconds + * @return text representation of the tweet's time according to format + */ + private static String formatTime(long then) { + long now = new Date().getTime(); + long diffInMinutes = (now - then) / MILLI / SEC_IN_MIN; + if (diffInMinutes < 2) { + return "Только что"; + } else if (TimeUnit.MINUTES.toHours(diffInMinutes) < 1) { + return diffInMinutes + " минут назад"; + } else if (TimeUnit.MINUTES.toHours(diffInMinutes) >= 1 && TimeUnit.MINUTES.toDays(diffInMinutes) < 1) { + return TimeUnit.MINUTES.toHours(diffInMinutes) + " часов назад"; + } else if (TimeUnit.MINUTES.toDays(diffInMinutes) >= 1 && TimeUnit.MINUTES.toDays(diffInMinutes) < 2) { + return "вчера"; + } else { + return TimeUnit.MINUTES.toDays(diffInMinutes) + " дней назад"; + } + } +} diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/handling/impl/PrintStreamOfTweetsHandler.java b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/handling/impl/PrintStreamOfTweetsHandler.java new file mode 100644 index 0000000..87e6e9a --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/handling/impl/PrintStreamOfTweetsHandler.java @@ -0,0 +1,101 @@ +package core.handling.impl; + +import config.Arguments; +import config.Constants; +import core.handling.TweetHandler; +import model.Tweet; +import utils.TextUtils; + +import java.io.PrintStream; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicLong; + +/* + * This handler is used in STREAM mode of application. + * It prints tweets every Constants.PRINT_TWEET_DELAY_SECS (by default every 1 second). + * + * When new tweet is obtained (method 'handle' invoked) this handler stores this tweet + * to internal queue. When next print step is come, handler takes tweet from queue + * (by 'poll' method) and prints this tweet. + * In case when no available tweet in queue, it will print nothing or + * Constants.NO_TWEET_MESSAGE in VERBOSE mode. + */ +public final class PrintStreamOfTweetsHandler implements TweetHandler { + public static final int STRING_SIZE = 256; + public static final int KILO = 1_000; + private PrintStream out; + private Queue tweetQueue; + + private boolean started = false; + private AtomicLong tweetCounter = new AtomicLong(0); + + public PrintStreamOfTweetsHandler(final PrintStream outStream) { + this.out = outStream; + tweetQueue = new ArrayDeque<>(); + } + + @Override + public void handle(Tweet tweet) { + tweetQueue.offer(tweet); + + if (!started) { + // schedule timer with task of printing tweets from queue + new Timer().schedule(new TimerTask() { + @Override + public void run() { + Tweet next = tweetQueue.poll(); + if (next != null) { + out.println("Tweet#" + tweetCounter.incrementAndGet() + ":"); + out.println(formatTweet(next)); + } else { + if (Arguments.getInstance().isVerboseMode()) { + out.println(Constants.NO_TWEET_MESSAGE); + } + } + } + }, 0, Constants.PRINT_TWEET_DELAY_SECS * KILO); + started = true; + } + } + + /* + * Print format is the following: + * + * If tweet IS NOT retweeted + * ---------------------------------------------------------------------------------------- + * @: + * ---------------------------------------------------------------------------------------- + * + * If tweet IS retweeted + * ---------------------------------------------------------------------------------------- + * @: ретвитнул @: ( ретвитов) + * ---------------------------------------------------------------------------------------- + * + * @param tweet tweet object to be printed + * @return text representation of tweet according to format + */ + private static String formatTweet(Tweet tweet) { + StringBuilder tweetView = new StringBuilder(STRING_SIZE); + tweetView.append("----------------------------------------------------------------------------------------\n"); + if (tweet.isNotRetweet()) { + tweetView.append("@").append(getNickname(tweet)). + append(": ").append(tweet.getText()); + } else { + Tweet retweetedTweet = tweet.getRetweetedTweet(); + tweetView.append("@").append(getNickname(tweet)). + append(": ретвитнул @").append(getNickname(retweetedTweet)). + append(": ").append(retweetedTweet.getText()). + append(" (").append(retweetedTweet.getRetweetCount()).append(" ретвитов)"); + } + tweetView.append("\n----------------------------------------------------------------------------------------"); + return tweetView.toString(); + } + + private static String getNickname(Tweet tweet) { + String nick = tweet.getAuthor().getName(); + return TextUtils.coloredText(nick, TextUtils.COLOR_BLUE); + } +} diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/providing/TweetsProvider.java b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/providing/TweetsProvider.java new file mode 100644 index 0000000..e56766a --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/providing/TweetsProvider.java @@ -0,0 +1,14 @@ +package core.providing; + +import config.TwitterConfig; +import core.handling.TweetHandler; + +/** + * Tweets provider interface. + */ +public interface TweetsProvider { + + void init(TwitterConfig twitterConfig); + + void provide(TweetHandler handler); +} diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/providing/TweetsProviderFactory.java b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/providing/TweetsProviderFactory.java new file mode 100644 index 0000000..664dd46 --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/providing/TweetsProviderFactory.java @@ -0,0 +1,29 @@ +package core.providing; + +import core.providing.impl.TweetsByQueryProvider; +import core.providing.impl.TweetsStreamProvider; +import model.Mode; + +/** + * Tweets provider factory. + */ +public final class TweetsProviderFactory { + + private TweetsProviderFactory() { } + + /** + * Gets tweets provider depending on the working mode. + * @param mode working mode of application + * @return tweets provider implementation + */ + public static TweetsProvider getProvider(Mode mode) { + switch (mode) { + case STREAM: + return new TweetsStreamProvider(); + case QUERY: + return new TweetsByQueryProvider(); + default: + throw new IllegalArgumentException("Mode = " + mode + " is not supported"); + } + } +} diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/providing/impl/TweetsByQueryProvider.java b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/providing/impl/TweetsByQueryProvider.java new file mode 100644 index 0000000..ba200c3 --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/providing/impl/TweetsByQueryProvider.java @@ -0,0 +1,65 @@ +package core.providing.impl; + +import config.Arguments; +import config.Constants; +import config.TwitterConfig; +import core.handling.TweetHandler; +import core.providing.TweetsProvider; +import core.quering.SearchQueryBuilder; +import model.Tweet; +import twitter4j.*; + +import java.util.concurrent.TimeUnit; + +/** + * Tweets by query provider, provides requested tweets to handler. + * + * Query is built by SearchQueryBuilder and method 'search' of + * Twitter instance is invoked. + * + * QueryResult contains list of statuses, which transformed to + * internal Tweet objects, after that these tweets are sent to + * handler. + */ +public final class TweetsByQueryProvider implements TweetsProvider { + private Twitter twitter; + private Query query; + + @Override + public void init(TwitterConfig twitterConfig) { + twitter = new TwitterFactory(twitterConfig.getConfiguration()).getInstance(); + query = new SearchQueryBuilder(twitter).buildQuery(); + } + + @Override + public void provide(TweetHandler handler) { + try { + QueryResult result = twitter.search(query); + if (result.getTweets() == null || result.getTweets().isEmpty()) { + System.out.println("No any tweets by specified query and place found"); + } + for (Status status : result.getTweets()) { + // transform status to Tweet object and send to handler + Tweet tweet = Tweet.valueOf(status); + if (Arguments.getInstance().hideRetweets() && tweet.isRetweet()) { + // skip this tweet + continue; + } + handler.handle(tweet); + } + } catch (TwitterException e) { + System.err.println("Twitter has been occasionally crashed with error: \"" + e.getMessage() + "\""); + System.err.println("Try one more time... [timeout = " + Constants.RECONNECT_TIMEOUT_SECS + " secs]"); + timeout(Constants.RECONNECT_TIMEOUT_SECS); + provide(handler); + } + } + + private static void timeout(int seconds) { + try { + TimeUnit.SECONDS.sleep(seconds); + } catch (InterruptedException e) { + // do nothing + } + } +} diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/providing/impl/TweetsStreamProvider.java b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/providing/impl/TweetsStreamProvider.java new file mode 100644 index 0000000..cc4e514 --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/providing/impl/TweetsStreamProvider.java @@ -0,0 +1,96 @@ +package core.providing.impl; + +import config.Arguments; +import config.Constants; +import config.TwitterConfig; +import core.handling.TweetHandler; +import core.providing.TweetsProvider; +import core.quering.FilterQueryBuilder; +import model.Tweet; +import twitter4j.*; +import twitter4j.auth.AccessToken; +import twitter4j.conf.Configuration; + +import java.util.concurrent.TimeUnit; + +/** + * Tweets stream provider, provides tweets to TweetHandler. + * + * This class implements StatusAdapter and registers itself + * as StatusListener in TwitterStream. + * Every time when new status obtained the transformation to + * internal Tweet object performed, after that this Tweet sent + * to the handler. + */ +public final class TweetsStreamProvider extends StatusAdapter implements TweetsProvider { + private Configuration configuration; + private AccessToken accessToken; + + private TwitterStream twitterStream; + private FilterQuery filterQuery; + + private TweetHandler tweetHandler; + + @Override + public void init(TwitterConfig twitterConfig) { + configuration = twitterConfig.getConfiguration(); + accessToken = twitterConfig.getAccessToken(); + } + + @Override + public void provide(TweetHandler handler) { + this.tweetHandler = handler; + connect(); + } + + @Override + public void onStatus(Status status) { + Tweet tweet = Tweet.valueOf(status); + if (Arguments.getInstance().hideRetweets() && tweet.isRetweet()) { + // skip this tweet + return; + } + tweetHandler.handle(tweet); + } + + @Override + public void onException(Exception e) { + System.err.println("Twitter stream has been occasionally crashed with error: \"" + e.getMessage() + "\""); + System.err.println("Try to reconnect... [timeout = " + Constants.RECONNECT_TIMEOUT_SECS + " secs]"); + reconnect(); + } + + private void connect() { + twitterStream = new TwitterStreamFactory(configuration).getInstance(accessToken); + Twitter twitter = new TwitterFactory(configuration).getInstance(); + filterQuery = new FilterQueryBuilder(twitter).buildQuery(); + // register itself as status listener + // method 'onStatus' will execute every time when new tweet obtained + twitterStream.addListener(this); + twitterStream.filter(filterQuery); + } + + private void disconnect() { + if (twitterStream != null) { + twitterStream.cleanUp(); + twitterStream.shutdown(); + // help GC + twitterStream = null; + } + } + + private void reconnect() { + // reconnect is performed with timeout between disconnect and new connect + disconnect(); + timeout(Constants.RECONNECT_TIMEOUT_SECS); + connect(); + } + + private static void timeout(int seconds) { + try { + TimeUnit.SECONDS.sleep(seconds); + } catch (InterruptedException e) { + // do nothing + } + } +} diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/quering/FilterQueryBuilder.java b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/quering/FilterQueryBuilder.java new file mode 100644 index 0000000..1b13f1a --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/quering/FilterQueryBuilder.java @@ -0,0 +1,55 @@ +package core.quering; + +import config.Arguments; +import utils.GeoUtils; +import twitter4j.*; + +/** + * FilterQuery builder to build query for Twitter searching + * based on arguments (query, place). + */ +public class FilterQueryBuilder { + private Twitter twitter; + + public FilterQueryBuilder(Twitter twiter) { + this.twitter = twiter; + } + + public final FilterQuery buildQuery() { + Arguments arguments = Arguments.getInstance(); + + FilterQuery query = new FilterQuery(); + query.track(arguments.getKeywords()); + + // calculate and set place for tweets + try { + Place place = GeoUtils.findPlaceByName(twitter, arguments.getPlace()); + GeoLocation[] vertices = place.getBoundingBoxCoordinates()[0]; + /* + * We try to find two points of box + * - the first with MIN latitude and longitude + * - the second with MAX latitude and longitude + * These two points will be used as filtering location for tweets. + * + * For additional details see: + * https://dev.twitter.com/streaming/overview/request-parameters#locations + * Checked with "New York City" ({-74,40},{-73,41}) + */ + double minLongitude = Double.MAX_VALUE, minLatitude = Double.MAX_VALUE; + double maxLongitude = -Double.MAX_VALUE, maxLatitude = -Double.MAX_VALUE; + for (GeoLocation vertex : vertices) { + minLongitude = Math.min(minLongitude, vertex.getLongitude()); + minLatitude = Math.min(minLatitude, vertex.getLatitude()); + maxLongitude = Math.max(maxLongitude, vertex.getLongitude()); + maxLatitude = Math.max(maxLatitude, vertex.getLatitude()); + } + double[][] locations = {{minLongitude, minLatitude}, {maxLongitude, maxLatitude}}; + query.locations(locations); + } catch (TwitterException te) { + System.err.println("Searching of places has been crashed with error = \"" + te.getMessage() + "\""); + System.err.println("Search query will be created without place condition."); + } + + return query; + } +} diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/quering/SearchQueryBuilder.java b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/quering/SearchQueryBuilder.java new file mode 100644 index 0000000..3948ad3 --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/core/quering/SearchQueryBuilder.java @@ -0,0 +1,94 @@ +package core.quering; + +import config.Arguments; +import config.Constants; +import utils.GeoUtils; +import twitter4j.*; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * SearchQuery builder to build query for tweets streaming + * based on arguments (keywords, place). + */ +public class SearchQueryBuilder { + private Twitter twitter; + + public SearchQueryBuilder(Twitter twiter) { + this.twitter = twiter; + } + + public final Query buildQuery() { + Arguments arguments = Arguments.getInstance(); + Query query = new Query(); + // set query for tweets + query.setQuery(arguments.getQuery()); + // set limit of tweets if specified in arguments + if (arguments.getLimitOfTweets() != Constants.NO_TWEETS_LIMIT) { + query.setCount(arguments.getLimitOfTweets()); + } + // calculate and set place for tweets + try { + Place place = GeoUtils.findPlaceByName(twitter, arguments.getPlace()); + GeoLocation[] vertices = place.getBoundingBoxCoordinates()[0]; + /* + * Implemented approach which described in the task: + * + * Для Twitter.search использовать среднее арифметическое широты и долготы + * Place.getBoundingBoxCoordinates() и радиус как половину максимального + * расстояния между точками. + */ + GeoLocation center = getCenter(vertices); + double radius = getRadius(vertices); + query.setGeoCode(center, radius, Query.Unit.mi); + } catch (TwitterException te) { + System.err.println("Searching of places has been crashed with error = \"" + te.getMessage() + "\""); + System.err.println("Search query will be created without place condition."); + } + + return query; + } + + public static GeoLocation getCenter(GeoLocation[] vertices) { + double centerLatitude, centerLongitude; + double[] latitudes = new double[vertices.length]; + double[] longitudes = new double[vertices.length]; + for (int i = 0; i < vertices.length; i++) { + GeoLocation vertex = vertices[i]; + latitudes[i] = vertex.getLatitude(); + longitudes[i] = vertex.getLongitude(); + } + centerLatitude = getArithmeticMean(latitudes); + centerLongitude = getArithmeticMean(longitudes); + return new GeoLocation(centerLatitude, centerLongitude); + } + + public static double getRadius(GeoLocation[] vertices) { + List distances = new ArrayList<>(); + // calculate distances between all vertices + for (int i = 0; i < vertices.length - 1; i++) { + for (int j = i + 1; j < vertices.length; j++) { + GeoLocation vertex1 = vertices[i]; + GeoLocation vertex2 = vertices[j]; + distances.add(GeoUtils.distanceBetweenTwoCoordinates( + vertex1.getLatitude(), vertex1.getLongitude(), + vertex2.getLatitude(), vertex2.getLongitude()) + ); + } + } + // sort list in reverse order (from MAX to MIN) + // and get the first - MAX of all distances + Collections.sort(distances, Collections.reverseOrder()); + return distances.get(0); + } + + public static double getArithmeticMean(double[] numbers) { + double sum = 0; + for (double num : numbers) { + sum += num; + } + return sum / numbers.length; + } +} diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/model/Mode.java b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/model/Mode.java new file mode 100644 index 0000000..23dc708 --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/model/Mode.java @@ -0,0 +1,8 @@ +package model; + +/** + * Working mode of application - as tweets stream or tweets by query. + */ +public enum Mode { + STREAM, QUERY; +} diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/model/Tweet.java b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/model/Tweet.java new file mode 100644 index 0000000..9ead43c --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/model/Tweet.java @@ -0,0 +1,80 @@ +package model; + +import twitter4j.Status; + +/** + * Class represents Tweet object. + */ +public final class Tweet { + private String text; + private TwitterUser author; + private long time; + private long retweetCount; + private Tweet retweetedTweet; + + public String getText() { + return text; + } + + public void setText(String tweetText) { + this.text = tweetText; + } + + public TwitterUser getAuthor() { + return author; + } + + public void setAuthor(TwitterUser tweetAuthor) { + this.author = tweetAuthor; + } + + public long getRetweetCount() { + return retweetCount; + } + + public void setRetweetCount(long count) { + this.retweetCount = count; + } + + public long getTime() { + return time; + } + + public void setTime(long tweetTime) { + this.time = tweetTime; + } + + public Tweet getRetweetedTweet() { + return retweetedTweet; + } + + public void setRetweetedTweet(Tweet retweeted) { + this.retweetedTweet = retweeted; + } + + public boolean isRetweet() { + return null != retweetedTweet; + } + + public boolean isNotRetweet() { + return !isRetweet(); + } + + /** + * Factory method to convert twitter4j.Status object to internal model - Tweet object. + * @param twitter4jStatus twitter4j.Status object + * @return Tweet object + */ + public static Tweet valueOf(Status twitter4jStatus) { + Tweet tweet = new Tweet(); + if (twitter4jStatus.getRetweetedStatus() != null) { + tweet.setRetweetedTweet(valueOf(twitter4jStatus.getRetweetedStatus())); + } + tweet.setAuthor(TwitterUser.valueOf(twitter4jStatus.getUser())); + tweet.setText(twitter4jStatus.getText()); + tweet.setTime(twitter4jStatus.getCreatedAt().getTime()); + tweet.setRetweetCount(twitter4jStatus.getRetweetCount()); + + return tweet; + } +} diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/model/TwitterUser.java b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/model/TwitterUser.java new file mode 100644 index 0000000..a10e491 --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/model/TwitterUser.java @@ -0,0 +1,42 @@ +package model; + +import twitter4j.User; + +/** + * Class represents Twitter User object. + */ +public class TwitterUser { + private Long id; + private String name; + + public TwitterUser(Long userid, String username) { + this.id = userid; + this.name = username; + } + + public final Long getId() { + return id; + } + + public final String getName() { + return name; + } + + /** + * Factory method to convert twitter4j.User object to internal model - TwitterUser object. + * @param twitter4jUser twitter4j.User object + * @return TwitterUser object + */ + public static TwitterUser valueOf(User twitter4jUser) { + return new TwitterUser(twitter4jUser.getId(), twitter4jUser.getName()); + } + + @Override + public final String toString() { + return "TwitterUser{" + + "id=" + id + + ", name='" + name + + '\'' + + '}'; + } +} diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/utils/GeoUtils.java b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/utils/GeoUtils.java new file mode 100644 index 0000000..662507b --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/utils/GeoUtils.java @@ -0,0 +1,51 @@ +package utils; + +import twitter4j.*; + +import static java.lang.Math.*; + +public final class GeoUtils { + public static final int CONST1 = 60; + public static final double CONST2 = 1.1515; + + private GeoUtils() { } + + /* + * Finds twitter4j.Place object by place name. + * @param twitter initialized twitter instance + * @param placeName name of place to search + * @return found twitter4j.Place object + * @throws TwitterException + */ + public static Place findPlaceByName(Twitter twitter, String placeName) throws TwitterException { + GeoQuery geoQuery = new GeoQuery((String) null); + geoQuery.setQuery(placeName); + // we need the only one place + geoQuery.setMaxResults(1); + ResponseList places = twitter.searchPlaces(geoQuery); + if (places.isEmpty()) { + throw new IllegalArgumentException("Place by name = '" + placeName + "' not found"); + } + return places.get(0); + } + + /* + * Calculates distance between two points based on their longitude and latitude. + * @see + * + * @param lat1 latitude of point_1 + * @param lon1 longitude of point_1 + * @param lat2 latitude of point_2 + * @param lon2 longitude of point_2 + * @return distance in miles + */ + public static double distanceBetweenTwoCoordinates(double lat1, double lon1, double lat2, double lon2) { + double theta = lon1 - lon2; + double dist = sin(toRadians(lat1)) * sin(toRadians(lat2)) + + cos(toRadians(lat1)) * cos(toRadians(lat2)) * cos(toRadians(theta)); + dist = acos(dist); + dist = toDegrees(dist); + dist = dist * CONST1 * CONST2; + return dist; + } +} diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/utils/TextUtils.java b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/utils/TextUtils.java new file mode 100644 index 0000000..e7f9304 --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/java/utils/TextUtils.java @@ -0,0 +1,16 @@ +package utils; + +public final class TextUtils { + public static final String COLOR_RESET = "\u001B[0m"; + public static final String COLOR_BLUE = "\u001B[34m"; + + // to prevent instantiating + // this class must be used as static only + private TextUtils() { } + + public static String coloredText(String text, String color) { + return color + + text + + COLOR_RESET; + } +} diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/resources/help.txt b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/resources/help.txt new file mode 100644 index 0000000..95008c4 --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/src/main/resources/help.txt @@ -0,0 +1,11 @@ +TwitterStream - it's a console application to print the stream of tweets onto the screen +according to the specified conditions. + +Command usage: +java TwitterStream \ + [--query|-q ] \ + [--place|-p ] \ + [--stream|-s] \ + [--hideRetweets] \ + [--limit|-l ] \ + [--help|-h] diff --git "a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/~$\320\260\321\202\320\272\320\260\321\217 \320\270\320\275\321\201\321\202\321\200\321\203\320\272\321\206\320\270\321\217.txt" "b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/~$\320\260\321\202\320\272\320\260\321\217 \320\270\320\275\321\201\321\202\321\200\321\203\320\272\321\206\320\270\321\217.txt" new file mode 100644 index 0000000..0ebf8fb Binary files /dev/null and "b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/TwitterStream/~$\320\260\321\202\320\272\320\260\321\217 \320\270\320\275\321\201\321\202\321\200\321\203\320\272\321\206\320\270\321\217.txt" differ diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/threads/task1/Counter.java b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/threads/task1/Counter.java new file mode 100644 index 0000000..f1ab341 --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/threads/task1/Counter.java @@ -0,0 +1,86 @@ +package threads.task1; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + + +public class Counter { + + private static final int DELAY_BETWEEN_COUNTER_SECONDS = 1; + + private static final int DEFAULT_COUNT_OF_THREADS = 5; + + public static void main(String[] args) throws InterruptedException { + final int countOfThreads; + if (args.length == 0) { + countOfThreads = DEFAULT_COUNT_OF_THREADS; + } else { + countOfThreads = Integer.parseInt(args[0]); + } + if (countOfThreads <= 1) { + throw new IllegalArgumentException("Count of threads must be > 1"); + } + + + ExecutorService executorService = Executors.newFixedThreadPool(countOfThreads); + final CountDownLatch initTracker = new CountDownLatch(countOfThreads); + final ReentrantLock lock = new ReentrantLock(); + + + Map conditions = new HashMap<>(countOfThreads); + for (int i = 1; i <= countOfThreads; i++) { + Condition iCondition = lock.newCondition(); + conditions.put(i, iCondition); + } + + + for (int i = 1; i <= countOfThreads; i++) { + + final int threadNum = i; + + final Condition myCondition = conditions.get(i); + + final Condition nextCondition; + if (i == countOfThreads) { + nextCondition = conditions.get(1); + } else { + nextCondition = conditions.get(i + 1); + } + executorService.execute(new Runnable() { + @Override + public void run() { + lock.lock(); + try { + + initTracker.countDown(); + while (true) { + + myCondition.await(); + System.out.println("Thread-" + threadNum); + TimeUnit.SECONDS.sleep(DELAY_BETWEEN_COUNTER_SECONDS); + + nextCondition.signal(); + } + } catch (InterruptedException e) { + // finish loop + } finally { + lock.unlock(); + } + } + }); + } + + + initTracker.await(); + try { + lock.lock(); + + conditions.get(1).signal(); + } finally { + lock.unlock(); + } + } +} diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/threads/task2/RollCall.java b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/threads/task2/RollCall.java new file mode 100644 index 0000000..4fd7af6 --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/threads/task2/RollCall.java @@ -0,0 +1,82 @@ +package threads.task2; + +import java.util.concurrent.*; + +public class RollCall { + + private static final int DELAY_BETWEEN_CALL_SECONDS = 1; + + private static final int DEFAULT_COUNT_OF_THREADS = 10; + + private static final int CONST10 = 10; + + private static int tries; + + public static void main(String[] args) throws InterruptedException { + final int countOfThreads; + if (args.length == 0) { + countOfThreads = DEFAULT_COUNT_OF_THREADS; + } else { + countOfThreads = Integer.parseInt(args[0]); + } + if (countOfThreads <= 1) { + throw new IllegalArgumentException("Count of threads must be > 1"); + } + + while (true) { + tries++; + ExecutorService executorService = Executors.newFixedThreadPool(countOfThreads); + + final CountDownLatch initTracker = new CountDownLatch(countOfThreads); + final CyclicBarrier readyBarrier = new CyclicBarrier(countOfThreads, new Runnable() { + @Override + public void run() { + System.out.println("All ready with " + tries + " tries!"); + System.exit(1); + } + }); + + try { + System.out.println("\nAre you ready?"); + for (int i = 0; i < countOfThreads; i++) { + final int currentNum = i; + + executorService.execute(new Runnable() { + @Override + public void run() { + boolean yes = isReady(); + if (yes) { + System.out.println("Thread#" + (currentNum + 1) + ": Yes"); + initTracker.countDown(); + try { + readyBarrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + // ignore exception + } + } else { + System.out.println("Thread#" + (currentNum + 1) + ": No"); + initTracker.countDown(); + } + } + }); + } + + initTracker.await(); + // continue with a small pause + TimeUnit.SECONDS.sleep(DELAY_BETWEEN_CALL_SECONDS); + } finally { + readyBarrier.reset(); + executorService.shutdown(); + } + } + } + + /** + * @return true - 90% false - 10% + */ + private static boolean isReady() { + int num = (int) (Math.random() * CONST10); + int anotherNum = (int) (Math.random() * CONST10); + return num != anotherNum; + } +} diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/threads/task3/BlockingQueue.java b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/threads/task3/BlockingQueue.java new file mode 100644 index 0000000..10aec29 --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/threads/task3/BlockingQueue.java @@ -0,0 +1,75 @@ +package threads.task3; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +public final class BlockingQueue { + private int maxElements; + + private List queue; + + private ReentrantLock lock = new ReentrantLock(); + private Condition notFull = lock.newCondition(); + private Condition notEmpty = lock.newCondition(); + + public BlockingQueue(int max) { + if (maxElements <= 0) { + throw new IllegalArgumentException("maxElements must be positive"); + } + this.maxElements = max; + queue = new LinkedList<>(); + } + + + public void offer(List elements) { + lock.lock(); + try { + while (!checkEmptyEnough(elements.size())) { + notFull.await(); + } + + queue.addAll(elements); + } catch (InterruptedException e) { + // ignore + } finally { + + notEmpty.signal(); + + lock.unlock(); + } + } + + + private boolean checkEmptyEnough(int requiredSize) { + return (queue.size() + requiredSize) <= maxElements; + } + + + public List take(int countOfElements) { + lock.lock(); + try { + while (!checkFullEnough(countOfElements)) { + notEmpty.await(); + } + int lastElementIndex = queue.size(); + int firstElementIndex = lastElementIndex - countOfElements; + List result = new LinkedList<>(queue.subList(firstElementIndex, lastElementIndex)); + queue.removeAll(result); + return result; + } catch (InterruptedException e) { + // ignore + return Collections.emptyList(); + } finally { + + notFull.signal(); + lock.unlock(); + } + } + + private boolean checkFullEnough(int countOfElements) { + return queue.size() >= countOfElements; + } +} diff --git a/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/threads/task3/BlockingQueueMain.java b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/threads/task3/BlockingQueueMain.java new file mode 100644 index 0000000..0179b62 --- /dev/null +++ b/projects/liza22/src/main/java/ru/mipt/diht/students/liza22/threads/task3/BlockingQueueMain.java @@ -0,0 +1,125 @@ +package threads.task3; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class BlockingQueueMain { + public static final int COUNT = 10; + public static final int N1 = 1; + public static final int N2 = 2; + public static final int N3 = 3; + public static final int N4 = 4; + public static final int N5 = 5; + public static final int N6 = 6; + public static final int N7 = 7; + public static final int N8 = 8; + public static final int N9 = 9; + public static final int N10 = 10; + + public static void main(String[] args) throws InterruptedException { + ExecutorService executorService = Executors.newFixedThreadPool(2); + + try { + System.out.println("\nFirst case - consumer blocked while provider will offer enough elements to queue"); + final BlockingQueue blockingQueue1 = new BlockingQueue<>(5); + final CountDownLatch firstCaseInitTracker = new CountDownLatch(2); + executorService.execute(new Runnable() { + @Override + public void run() { + System.out.println("CONSUMER: try to take 5 elements from queue..."); + List elements = blockingQueue1.take(N5); + System.out.println("CONSUMER: elements taken: " + elements); + firstCaseInitTracker.countDown(); + } + }); + executorService.execute(new Runnable() { + @Override + public void run() { + try { + System.out.println("PROVIDER: sleep 3 seconds before offering elements to queue..."); + TimeUnit.SECONDS.sleep(N3); + System.out.println("PROVIDER: offer 5 elements to queue..."); + blockingQueue1.offer(Arrays.asList(N1, N2, N3, N4, N5)); + firstCaseInitTracker.countDown(); + } catch (InterruptedException e) { + // ignore + } + } + }); + firstCaseInitTracker.await(); + + System.out.println("\nSecond case - provider blocked while consumer will take extra elements from queue"); + final BlockingQueue blockingQueue2 = new BlockingQueue<>(5); + final CountDownLatch secondCaseInitTracker = new CountDownLatch(2); + executorService.execute(new Runnable() { + @Override + public void run() { + System.out.println("PROVIDER: offer 1-5 elements to queue..."); + blockingQueue2.offer(Arrays.asList(N1, N2, N3, N4, N5)); + System.out.println("PROVIDER: 1-5 element offered!"); + System.out.println("PROVIDER: offer 5-10 elements to queue..."); + blockingQueue2.offer(Arrays.asList(N6, N7, N8, N9, N10)); + System.out.println("PROVIDER: 5-10 element offered!"); + secondCaseInitTracker.countDown(); + } + }); + executorService.execute(new Runnable() { + @Override + public void run() { + System.out.println("CONSUMER: sleep 3 seconds before take elements from queue..."); + try { + TimeUnit.SECONDS.sleep(N3); + } catch (InterruptedException e) { + // ignore + } + System.out.println("CONSUMER: take 5 elements from queue..."); + System.out.println("CONSUMER: Taken elements: " + blockingQueue2.take(N5)); + secondCaseInitTracker.countDown(); + } + }); + secondCaseInitTracker.await(); + + System.out.println("\nThird case - " + + "provider blocked while consumer will take extra elements from queue one by one"); + final BlockingQueue blockingQueue3 = new BlockingQueue<>(3); + final CountDownLatch thirdCaseInitTracker = new CountDownLatch(2); + executorService.execute(new Runnable() { + @Override + public void run() { + System.out.println("PROVIDER: offer 10 elements to queue one by one..."); + for (int i = 1; i <= COUNT; i++) { + System.out.println("PROVIDER: offer [" + i + "] element to queue..."); + blockingQueue3.offer(Collections.singletonList(i)); + } + System.out.println("PROVIDER: all 10 elements offered to queue!"); + thirdCaseInitTracker.countDown(); + } + }); + executorService.execute(new Runnable() { + @Override + public void run() { + System.out.println("CONSUMER: take 10 elements from queue one by one..."); + for (int i = 1; i <= COUNT; i++) { + System.out.println("CONSUMER: taken " + blockingQueue3.take(1) + " element from queue..."); + // sleep 1 second before next taking... + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + // ignore + } + } + System.out.println("CONSUMER: all 10 elements are taken from queue!"); + thirdCaseInitTracker.countDown(); + } + }); + thirdCaseInitTracker.await(); + } finally { + executorService.shutdown(); + } + } +} diff --git a/projects/liza22/src/test/java/ru/mipt/diht/students/AppTest.java b/projects/liza22/src/test/java/ru/mipt/diht/students/AppTest.java new file mode 100644 index 0000000..4bb8dd4 --- /dev/null +++ b/projects/liza22/src/test/java/ru/mipt/diht/students/AppTest.java @@ -0,0 +1,38 @@ +package ru.mipt.diht.students; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Unit test for simple App. + */ +public class AppTest + extends TestCase +{ + /** + * Create the test case + * + * @param testName name of the test case + */ + public AppTest( String testName ) + { + super( testName ); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() + { + return new TestSuite( AppTest.class ); + } + + /** + * Rigourous Test :-) + */ + public void testApp() + { + assertTrue( true ); + } +} diff --git a/projects/pom.xml b/projects/pom.xml index 5d14966..5fc3c22 100644 --- a/projects/pom.xml +++ b/projects/pom.xml @@ -1,5 +1,5 @@ - + + 4.0.0 ru.mipt.diht.students @@ -30,7 +30,8 @@ dkhurtin ale3otik Pitovsky - + liza22 + @@ -133,4 +134,4 @@ - + \ No newline at end of file