/** * Creates an Executor that uses a single worker thread operating * off an unbounded queue. (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.) Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * {@code newFixedThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads. * * @return the newly created single-threaded Executor */ publicstatic ExecutorService newSingleThreadExecutor(){ returnnew FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ publicThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler){ if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) thrownew IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) thrownew NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
/** * A handler for rejected tasks that runs the rejected task * directly in the calling thread of the {@code execute} method, * unless the executor has been shut down, in which case the task * is discarded. */ //用于拒绝任务的处理程序,可以直接在{@code execute}方法的调用线程中运 //行被拒绝的任务*除非执行器已被关闭,否则这种情况下该任务将被丢弃。 //简单点就是谁调用谁执行 publicstaticclassCallerRunsPolicyimplementsRejectedExecutionHandler{ /** * Creates a {@code CallerRunsPolicy}. */ publicCallerRunsPolicy(){ }
/** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ publicvoidrejectedExecution(Runnable r, ThreadPoolExecutor e){ if (!e.isShutdown()) { r.run(); } } } /** * A handler for rejected tasks that throws a * {@code RejectedExecutionException}. */ //直接抛异常 publicstaticclassAbortPolicyimplementsRejectedExecutionHandler{ /** * Creates an {@code AbortPolicy}. */ publicAbortPolicy(){ }
/** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ publicvoidrejectedExecution(Runnable r, ThreadPoolExecutor e){ thrownew RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
/** * A handler for rejected tasks that silently discards the * rejected task. */ //直接放弃 publicstaticclassDiscardPolicyimplementsRejectedExecutionHandler{ /** * Creates a {@code DiscardPolicy}. */ publicDiscardPolicy(){ }
/** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ publicvoidrejectedExecution(Runnable r, ThreadPoolExecutor e){ } }
/** * A handler for rejected tasks that discards the oldest unhandled * request and then retries {@code execute}, unless the executor * is shut down, in which case the task is discarded. */ //放弃最旧未处理的程序 publicstaticclassDiscardOldestPolicyimplementsRejectedExecutionHandler{ /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ publicDiscardOldestPolicy(){ }
/** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ publicvoidrejectedExecution(Runnable r, ThreadPoolExecutor e){ if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
new ThreadPoolExecutor.AbortPolicy() // 银行满了,还有人进来,不处理这个人的,抛出异常
new ThreadPoolExecutor.CallerRunsPolicy() // 哪来的去哪里!
new ThreadPoolExecutor.DiscardPolicy() //队列满了,丢掉任务,不会抛出异常!
new ThreadPoolExecutor.DiscardOldestPolicy() //队列满了,尝试去和最早的竞争,也不会抛出异常!
System.out.println(Runtime.getRuntime().availableProcessors()); //ThreadPoolExecutor的构造方法 /** * 七大参数 * public ThreadPoolExecutor(int corePoolSize, * int maximumPoolSize, * long keepAliveTime, * TimeUnit unit, * BlockingQueue<Runnable> workQueue, * ThreadFactory threadFactory) { * this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, * threadFactory, defaultHandler); * } * * * 四种拒绝策略 * A handler for rejected tasks that throws a * {@code RejectedExecutionException}. 不处理,抛异常 * AbortPolicy * * A handler for rejected tasks that runs the rejected task * directly in the calling thread of the {@code execute} method, * unless the executor has been shut down, in which case the task * is discarded. 哪来的去哪里! * 用于拒绝任务的处理程序,可以直接在方法的调用线程中运行被拒绝的任务*除非执行器已被关闭,否则这种情况下该任务将被丢弃。 * CallerRunsPolicy * * A handler for rejected tasks that discards the oldest unhandled * request and then retries {@code execute}, unless the executor * is shut down, in which case the task is discarded. * 拒绝任务的处理程序,它丢弃<p>最旧</p>的未处理*请求,然后重试,除非执行程序*被关闭,在这种情况下,该任务将被丢弃。 * DiscardOldestPolicy * * A handler for rejected tasks that silently discards the * rejected task. 直接抛弃 * DiscardPolicy * * * * * */ ExecutorService threadPoolExecutor = new ThreadPoolExecutor( 2, 5, 3, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), //阻塞队列实现 Executors.defaultThreadFactory(), //默认线程工厂 new ThreadPoolExecutor.DiscardPolicy() //拒绝策略 );
/** * Represents a function that accepts one argument and produces a result. * * <p>This is a <a href="package-summary.html">functional interface</a> * whose functional method is {@link #apply(Object)}. * * @param <T> the type of the input to the function * @param <R> the type of the result of the function * * @since 1.8 */ @FunctionalInterface publicinterfaceFunction<T, R> {
/** * Applies this function to the given argument. * * @param t the function argument * @return the function result */ R apply(T t);
/** * Represents a predicate (boolean-valued function) of one argument. * * <p>This is a <a href="package-summary.html">functional interface</a> * whose functional method is {@link #test(Object)}. * * @param <T> the type of the input to the predicate * * @since 1.8 */ @FunctionalInterface publicinterfacePredicate<T> {
/** * Evaluates this predicate on the given argument. * * @param t the input argument * @return {@code true} if the input argument matches the predicate, * otherwise {@code false} */ booleantest(T t);
/** * Represents an operation that accepts a single input argument and returns no * result. Unlike most other functional interfaces, {@code Consumer} is expected * to operate via side-effects. * * <p>This is a <a href="package-summary.html">functional interface</a> * whose functional method is {@link #accept(Object)}. * * @param <T> the type of the input to the operation * * @since 1.8 */ @FunctionalInterface publicinterfaceConsumer<T> {
/** * Performs this operation on the given argument. * * @param t the input argument */ voidaccept(T t);
/** * Represents a supplier of results. * * <p>There is no requirement that a new or distinct result be returned each * time the supplier is invoked. * * <p>This is a <a href="package-summary.html">functional interface</a> * whose functional method is {@link #get()}. * * @param <T> the type of results supplied by this supplier * * @since 1.8 */ @FunctionalInterface publicinterfaceSupplier<T> {
/** * Gets a result. * * @return a result */ T get(); }
/** * 题目要求:一分钟内完成此题,只能用一行代码实现! * 现在有5个用户!筛选: * 1、ID 必须是偶数 * 2、年龄必须大于23岁 * 3、用户名转为大写字母 * 4、用户名字母倒着排序 * 5、只输出一个用户! */ publicclassTest{ publicstaticvoidmain(String[] args){ User u1 = new User(1,"a",21); User u2 = new User(2,"b",22); User u3 = new User(3,"c",23); User u4 = new User(4,"d",24); User u5 = new User(6,"e",25);
//1.直接for循环求 publicstaticvoidtest1(){ Long sum = 0L; long start = System.currentTimeMillis(); for (Long i = 1L; i <= 10_0000_0000; i++) { sum +=i; } long end = System.currentTimeMillis(); System.out.println("sum="+sum+" 花费时间:"+(end-start)); }
//2.forJoin publicstaticvoidtest2()throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinDemo(0L, 10_0000_0000L); ForkJoinTask<Long> submit = forkJoinPool.submit(task); Long sum = submit.get(); long end = System.currentTimeMillis(); System.out.println("sum="+sum+" 花费时间:"+(end-start)); }
//3.Stream并行流操作 publicstaticvoidtest3(){ long start = System.currentTimeMillis(); //Stream 并行流 Long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0,Long::sum); long end = System.currentTimeMillis(); System.out.println("sum="+sum+" 花费时间:"+(end-start)); } }